ppgjx 发表于 2022-11-13 20:33

java websocket 并发链接问题

最近在做用户打开app后建立一个websocket 我是采用这样的逻辑

websocket有两个基本事件回调 链接建立完毕 链接断开

我采用的逻辑是 使用一个ConcurrentHashMap 维护这个在线人数 key是用户id   value是websocket的Session会话

public static final Map<String, Session> clients = new ConcurrentHashMap<>();


下面是各个事件的伪代码

链接建立完毕:
1. 客户端带着用户的token建立链接
2. 服务器端根据用户token判断是否校验通过
3. 如果服务端校验token不通过则断开链接(Session会话)如果校验token通过 则判断这个用户是否已经存在ConcurrentHashMap 如果已经存在则删除已存在的key和关闭对应的链接(Session会话)
4. 把用户信息和Session会话塞进ConcurrentHashMap中

链接断开: 从ConcurrentHashMap中删除这个用户信息

以上的代码逻辑普通操作是没问题的,如果用户在a设备进行链接成功,又在b设备链接,a设备就会被踢掉

但是如果并发太高 我进行了模拟 开了十个线程 每个线程都用同一个用户的token信息去和服务器进行链接 就会导致线程安全问题

根据我得代码逻辑 没有并发的情况下 websocket的每个session会话都对应着ConcurrentHashMap存着的用户信息 没用的session会被销毁

在并发很高的情况下 就会造成有些session没有被销毁 也不存在ConcurrentHashMap里面 这就会造成内存泄露吧 这种问题怎么解决呢 虽然ConcurrentHashMap 是线程安全的 但是业务代码并不是线程安全的 如果加锁非常影响性能





个快快 发表于 2022-11-13 21:15

不清楚你的线程模型是怎么样的,是每次创建完连接随机选一个线程去处理消息嘛?
大致有如下几个方案解决你的这个顶号数据问题:
1. 加锁。网上各种资料确实会告诉你加锁影响性能,但你这个场景仅仅是登录的时候加锁,无非是登陆过程稍慢一些,可以想想把加锁范围降到最小,然后用spinLock,不会对你整体性能有太大影响。
2. 把登陆流程给串行化。怎么做呢?就是把登录相关的流程、消息,全丢给一个特定的线程去干。怎么丢消息,那个特定线程怎么安全的取消息,阻塞队列?大概率也会用到类似于锁的概念。
3. 思考下你的业务是否真的需要高并发,如果不需要,换单线程模型,i/o复用在单线程下表现也不俗。

萋小磊 发表于 2022-11-13 22:05

如果用户在a设备进行链接成功,又在b设备链接,a设备就会被踢掉

但是如果并发太高 我进行了模拟 开了十个线程 每个线程都用同一个用户的token信息去和服务器进行链接 就会导致线程安全问题

根据我得代码逻辑 没有并发的情况下 websocket的每个session会话都对应着ConcurrentHashMap存着的用户信息 没用的session会被销毁

其实你的 Lock 点没必要那么大

伪代码
func login () {

   // 每个线程都用同一个用户的token信息去和服务器进行链接 就会导致线程安全问题
   你只需要 lock 一个 userId的 key 即可比如, 其他用户并不会被锁住
   lock(userId) {
      //
   }
}

另一个向楼上说的一样, 其实连接 是低频操作, 这里你大可忽略不计, 除非你的代码真 emmm 不行


ppgjx 发表于 2022-11-13 23:51

萋小磊 发表于 2022-11-13 22:05
如果用户在a设备进行链接成功,又在b设备链接,a设备就会被踢掉

但是如果并发太高 我进行了模拟 开了十 ...
@ServerEndpoint(path = "/websocket",port = "${testWebsocketPort}",allIdleTimeSeconds = "10")
@Slf4j
public class UserWebSocket{

    //redis
    private staticStringRedisTemplate redisTemplate;

    private String uid;

    private String token;

    @Autowired
    publicvoid setRedisTemplate(StringRedisTemplate redisTemplate) {
      UserWebSocket.redisTemplate = redisTemplate;
    }
    @Autowired
    publicvoid setPxUserService(PxUserService pxUserService) {
      UserWebSocket.pxUserService = pxUserService;
    }


    //用户服务类
    private staticPxUserService pxUserService;


    //保存所有客户连接
    public static final Map<String, WsClient> clients = new ConcurrentHashMap<>();


    //有新的连接进入时,对该方法进行回调 注入参数的类型:Session、HttpHeaders...
    @BeforeHandshake
    public void handshake(Session session, HttpHeaders headers) {
      System.out.println("新链接: " +
                "" + session.channel().id().asShortText() + "当前在线: " + clients.size());
      //设置协议
      session.setSubprotocols("stomp");
    }

    //当有新的WebSocket连接完成时,对该方法进行回调 注入参数的类型:Session、HttpHeaders...
    @OnOpen
    public void onOpen(Session session, HttpHeaders headers,@RequestParam String uid,@RequestParam String token) {

      synchronized(UserWebSocket.clients){
            log.info(String.format("新用户加入: uid-%s id-%s" ,uid,session.channel().id().asShortText()));
            //是否登录
            if(pxUserService.isLogin(uid,token)){
                //判断是否已连接 已连接则移除
                WsClient client1 = clients.get(uid);
                if(null != client1){
                  client1.getSession().close();
                  clients.remove(uid);
                }
                //登录成功 写入信息
                UserInfo userInfo = pxUserService.getUserInfo(uid);
                WsClient client = new WsClient();
                client.setSession(session);
                client.setUserInfo(userInfo);
                clients.put(uid,client);
                this.uid = uid;
                this.token = token;
            }else {
                //未登录
                sendMsgErr("token错误",session);
                session.close();
            }
      }


    }


    //当有WebSocket连接关闭时,对该方法进行回调 注入参数的类型:Session
    @OnClose
    public void onClose(Session session) throws IOException {
      synchronized(UserWebSocket.clients){
            //清理uid
            removeUser(this.uid);
      }

      System.out.println("链接关闭: " + session.channel().id().asShortText() + "当前在线: " + clients.size());
    }

    //当有WebSocket抛出异常时,对该方法进行回调 注入参数的类型:Session、Throwable
    @OnError
    public void onError(Session session, Throwable throwable) {
      synchronized(UserWebSocket.clients){
            session.close();
            removeUser(this.uid);
      }
      System.out.println("链接异常: " + session.channel().id().asShortText() + "当前在线: " + clients.size());
      throwable.printStackTrace();
    }

    //当接收到字符串消息时,对该方法进行回调 注入参数的类型:Session、String
    @OnMessage
    public void onMessage(Session session, String message){
      System.out.println("消息: " + session.channel().id().asShortText() + "当前在线: " + clients.size());
      System.out.println(message);
      WsClient userInfo = clients.get(this.uid);
      if(null != userInfo){
            sendMsgOkAll(userInfo.getUserInfo().getName() + "|" + message);
      }
    }

    //当接收到Netty的事件时,对该方法进行回调 注入参数的类型:Session、Object
    @OnEvent
    public void onEvent(Session session,Object evt) {

      synchronized(UserWebSocket.clients){
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
                switch (idleStateEvent.state()) {
                  case READER_IDLE:
                        System.out.println("read idle");
                        break;
                  case WRITER_IDLE:
                        System.out.println("write idle");
                        break;
                  case ALL_IDLE:
                        System.out.println("all idle");
                        //如果这个会话等于存入的会话则删除避免无用会话删除了正常的链接
                        WsClient client = clients.get(this.uid);
                        if(null != client && session.equals(client.getSession())){
                            clients.remove(this.uid);
                        }
                        session.close();
                        System.out.println("进入空闲断开链接: " + session.channel().id().asShortText() + "当前在线: " + clients.size());
                        break;
                  default:
                        break;
                }
            }
      }

    }

    /**
   * 成功消息
   * @param ans
   * @param session
   */
    public void sendMsgOk(Object ans,Session session){
      WsServerAns wsServerAns = new WsServerAns(WsResultCode.SUCCESS.getCode(), WsResultCode.SUCCESS.getMsg());
      wsServerAns.setData(ans);
      session.sendText(JSON.toJSONString(ans));
    }

    /**
   * 错误消息
   * @param msg
   * @param session
   */
    public void sendMsgErr(String msg,Session session){
      WsServerAns wsServerAns = new WsServerAns(WsResultCode.ERR.getCode(), msg);

      session.sendText(JSON.toJSONString(wsServerAns));
    }


    /**
   * 广播所有用户
   * @param msg
   */
    public void sendMsgOkAll(String msg){
      WsServerAns wsServerAns = new WsServerAns(WsResultCode.SUCCESS.getCode(), WsResultCode.SUCCESS.getMsg());
      wsServerAns.setData(msg);
      clients.forEach((k,v)->{
            v.getSession().sendText(JSON.toJSONString(wsServerAns));
      });
    }


    public voidremoveUser(String uid){
      if(null != uid){
            clients.remove(this.uid);
      }
    }

}





我尝试过 就算所有代码都加锁 也会有死链接的情况 websocket刚接触 对多线程这块很不熟

萋小磊 发表于 2022-11-14 00:19

ppgjx 发表于 2022-11-13 23:51
@ServerEndpoint(path = "/websocket",port = "${testWebsocketPort}",allIdleTi ...

对 你这个锁有问题,你 sync 是 当前的这个 map.

再者就是 ConcurrentHashMap 是线程安全的, 他内部的方法加锁了的
你直接删除你的 sync 就行
写起来没那么复杂的

ppgjx 发表于 2022-11-14 11:47

萋小磊 发表于 2022-11-14 00:19
对 你这个锁有问题,你 sync 是 当前的这个 map.

再者就是 ConcurrentHashMap 是线程安全的, 他内部 ...

我没理解你的意思 这个map不是私有的 是全部websocket实例共享的一个 加锁其他的新连接应该是阻塞的吧
页: [1]
查看完整版本: java websocket 并发链接问题