HK仅輝 发表于 2022-4-18 14:40

Jeecg Boot webSocket推送报并发冲突错误怎么加锁

本帖最后由 HK仅輝 于 2022-4-18 15:40 编辑

怎么加锁,‘服务器端推送消息pushMessage()’(94行)方法中报的错误

      at org.jeecg.modules.message.websocket.WebSocket.pushMessage(WebSocket.java:96)
      at org.jeecg.modules.message.websocket.WebSocket.onMessage(WebSocket.java:113)
package org.jeecg.modules.message.websocket;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.annotation.Resource;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.jeecg.common.base.BaseMap;
import org.jeecg.common.constant.WebsocketConst;
import org.jeecg.common.modules.redis.client.JeecgRedisClient;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;

import lombok.extern.slf4j.Slf4j;

/**
* @AuThor scott
* @date 2019/11/29 9:41
* @Description: 此注解相当于设置访问URL
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}") //此注解相当于设置访问URL
public class WebSocket {

    private Session session;

    private String userId;

    private static final String REDIS_TOPIC_NAME = "socketHandler";

    @Resource
    private JeecgRedisClient jeecgRedisClient;

    /**
   * 缓存 webSocket连接到单机服务class中(整体方案支持集群)
   */
    private static CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();
    private static Map<String, Session> sessionPool = new HashMap<String, Session>();


    @OnOpen
    public void onOpen(Session session, @PathParam(value = "userId") String userId) {
      try {
            this.session = session;
            this.userId = userId;
            webSockets.add(this);
            sessionPool.put(userId, session);
            log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
      } catch (Exception e) {
      }
    }

    @OnClose
    public void onClose() {
      try {
            webSockets.remove(this);
            sessionPool.remove(this.userId);
            log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
      } catch (Exception e) {
      }
    }


    /**
   * 服务端推送消息
   *
   * @param userId
   * @param message
   */
    public void pushMessage(String userId, String message) {
      Session session = sessionPool.get(userId);
      if (session != null && session.isOpen()) {
            try {
                log.info("【websocket消息】 单点消息:" + message);
                session.getAsyncRemote().sendText(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
      }
    }

    /**
   * 服务器端推送消息
   */
    public void pushMessage(String message) {
      try {
            webSockets.forEach(ws -> ws.session.getAsyncRemote().sendText(message));
      } catch (Exception e) {
            e.printStackTrace();
      }
    }


    @OnMessage
    public void onMessage(String message) {
      //todo 现在有个定时任务刷,应该去掉
      log.debug("【websocket消息】收到客户端消息:" + message);
      JSONObject obj = new JSONObject();
      //业务类型
      obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);
      //消息内容
      obj.put(WebsocketConst.MSG_TXT, "心跳响应");
      for (WebSocket webSocket : webSockets) {
            webSocket.pushMessage(message);
      }
    }

    /**
   * 后台发送消息到redis
   *
   * @param message
   */
    public void sendMessage(String message) {
      log.info("【websocket消息】广播消息:" + message);
      BaseMap baseMap = new BaseMap();
      baseMap.put("userId", "");
      baseMap.put("message", message);
      jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
    }

    /**
   * 此为单点消息
   *
   * @param userId
   * @param message
   */
    public void sendMessage(String userId, String message) {
      BaseMap baseMap = new BaseMap();
      baseMap.put("userId", userId);
      baseMap.put("message", message);
      jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
    }

    /**
   * 此为单点消息(多人)
   *
   * @param userIds
   * @param message
   */
    public void sendMessage(String[] userIds, String message) {
      for (String userId : userIds) {
            sendMessage(userId, message);
      }
    }

}


java.lang.IllegalStateException: The remote endpoint was in state which is an invalid state for called method
      at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.checkState(WsRemoteEndpointImplBase.java:1258)
      at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.textStart(WsRemoteEndpointImplBase.java:1220)
      at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendStringByCompletion(WsRemoteEndpointImplBase.java:210)
      at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendStringByFuture(WsRemoteEndpointImplBase.java:198)
      at org.apache.tomcat.websocket.WsRemoteEndpointAsync.sendText(WsRemoteEndpointAsync.java:53)
      at org.jeecg.modules.message.websocket.WebSocket.lambda$pushMessage$0(WebSocket.java:96)
      at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:895)
      at java.util.concurrent.CopyOnWriteArraySet.forEach(CopyOnWriteArraySet.java:404)
      at org.jeecg.modules.message.websocket.WebSocket.pushMessage(WebSocket.java:96)
      at org.jeecg.modules.message.websocket.WebSocket.onMessage(WebSocket.java:113)
      at sun.reflect.GeneratedMethodAccessor1426.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at org.apache.tomcat.websocket.pojo.PojoMessageHandlerWholeBase.onMessage(PojoMessageHandlerWholeBase.java:80)
      at org.apache.tomcat.websocket.WsFrameBase.sendMessageText(WsFrameBase.java:415)
      at org.apache.tomcat.websocket.server.WsFrameServer.sendMessageText(WsFrameServer.java:129)
      at org.apache.tomcat.websocket.WsFrameBase.processDataText(WsFrameBase.java:515)
      at org.apache.tomcat.websocket.WsFrameBase.processData(WsFrameBase.java:301)
      at org.apache.tomcat.websocket.WsFrameBase.processInputBuffer(WsFrameBase.java:133)
      at org.apache.tomcat.websocket.server.WsFrameServer.onDataAvailable(WsFrameServer.java:85)
      at org.apache.tomcat.websocket.server.WsFrameServer.doOnDataAvailable(WsFrameServer.java:183)
      at org.apache.tomcat.websocket.server.WsFrameServer.notifyDataAvailable(WsFrameServer.java:162)
      at org.apache.tomcat.websocket.server.WsHttpUpgradeHandler.upgradeDispatch(WsHttpUpgradeHandler.java:156)
      at org.apache.coyote.http11.upgrade.UpgradeProcessorInternal.dispatch(UpgradeProcessorInternal.java:60)
      at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:59)
      at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:887)
      at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1684)
      at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
      at java.lang.Thread.run(Thread.java:748)



/│\云。 发表于 2022-4-18 15:50

webSockets.forEach(ws -> {
                              synchronized(ws.session) {
                                        try {
                                        ws.session.getAsyncRemote().sendText(message)
                                       } catch (Exception e) {
                                                e.printStackTrace();
                                        }
                              }
                              
                        });


这样试看看不过尽量不这样循环
页: [1]
查看完整版本: Jeecg Boot webSocket推送报并发冲突错误怎么加锁