吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 1039|回复: 5
收起左侧

[求助] java websocket 并发链接问题

[复制链接]
ppgjx 发表于 2022-11-13 20:33
最近在做用户打开app后建立一个websocket 我是采用这样的逻辑

websocket有两个基本事件回调 链接建立完毕 链接断开

我采用的逻辑是 使用一个  ConcurrentHashMap 维护这个在线人数 key是用户id     value是websocket的Session会话

[Asm] 纯文本查看 复制代码
 public static final Map<String, Session> clients = new ConcurrentHashMap<>();


下面是各个事件的伪代码

链接建立完毕:
1. 客户端带着用户的token建立链接
2. 服务器端根据用户token判断是否校验通过
3. 如果服务端校验token不通过则断开链接(Session会话)  如果校验token通过 则判断这个用户是否已经存在ConcurrentHashMap 如果已经存在则删除已存在的key和关闭对应的链接(Session会话)
4. 把用户信息和Session会话塞进ConcurrentHashMap中

链接断开: 从ConcurrentHashMap中删除这个用户信息

以上的代码逻辑普通操作是没问题的,如果用户在a设备进行链接成功,又在b设备链接,a设备就会被踢掉

但是如果并发太高 我进行了模拟 开了十个线程 每个线程都用同一个用户的token信息去和服务器进行链接 就会导致线程安全问题

根据我得代码逻辑 没有并发的情况下 websocket的每个session会话都对应着ConcurrentHashMap存着的用户信息 没用的session会被销毁  

在并发很高的情况下 就会造成有些session没有被销毁 也不存在ConcurrentHashMap里面 这就会造成内存泄露吧 这种问题怎么解决呢 虽然ConcurrentHashMap 是线程安全的 但是业务代码并不是线程安全的 如果加锁非常影响性能





发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

个快快 发表于 2022-11-13 21:15
不清楚你的线程模型是怎么样的,是每次创建完连接随机选一个线程去处理消息嘛?
大致有如下几个方案解决你的这个顶号数据问题:
1. 加锁。网上各种资料确实会告诉你加锁影响性能,但你这个场景仅仅是登录的时候加锁,无非是登陆过程稍慢一些,可以想想把加锁范围降到最小,然后用spinLock,不会对你整体性能有太大影响。
2. 把登陆流程给串行化。怎么做呢?就是把登录相关的流程、消息,全丢给一个特定的线程去干。怎么丢消息,那个特定线程怎么安全的取消息,阻塞队列?大概率也会用到类似于锁的概念。
3. 思考下你的业务是否真的需要高并发,如果不需要,换单线程模型,i/o复用在单线程下表现也不俗。

免费评分

参与人数 1吾爱币 +1 热心值 +1 收起 理由
ppgjx + 1 + 1 我很赞同!

查看全部评分

萋小磊 发表于 2022-11-13 22:05
如果用户在a设备进行链接成功,又在b设备链接,a设备就会被踢掉

但是如果并发太高 我进行了模拟 开了十个线程 每个线程都用同一个用户的token信息去和服务器进行链接 就会导致线程安全问题

根据我得代码逻辑 没有并发的情况下 websocket的每个session会话都对应着ConcurrentHashMap存着的用户信息 没用的session会被销毁  

其实你的 Lock 点没必要那么大

伪代码
func login () {

   // 每个线程都用同一个用户的token信息去和服务器进行链接 就会导致线程安全问题
   你只需要 lock 一个 userId  的 key 即可比如, 其他用户并不会被锁住
   lock(userId) {
      //
   }
}

另一个向楼上说的一样, 其实  连接 是低频操作, 这里你大可忽略不计, 除非你的代码真 emmm 不行


免费评分

参与人数 1吾爱币 +1 热心值 +1 收起 理由
ppgjx + 1 + 1 我很赞同!

查看全部评分

 楼主| ppgjx 发表于 2022-11-13 23:51
萋小磊 发表于 2022-11-13 22:05
如果用户在a设备进行链接成功,又在b设备链接,a设备就会被踢掉

但是如果并发太高 我进行了模拟 开了十 ...

[Java] 纯文本查看 复制代码
@ServerEndpoint(path = "/websocket",port = "${testWebsocketPort}",allIdleTimeSeconds = "10")
@Slf4j
public class UserWebSocket{

    //redis
    private static  StringRedisTemplate redisTemplate;

    private String uid;

    private String token;

    @Autowired
    public  void setRedisTemplate(StringRedisTemplate redisTemplate) {
        UserWebSocket.redisTemplate = redisTemplate;
    }
    @Autowired
    public  void setPxUserService(PxUserService pxUserService) {
        UserWebSocket.pxUserService = pxUserService;
    }


    //用户服务类
    private static  PxUserService pxUserService;


    //保存所有客户连接
    public static final Map<String, WsClient> clients = new ConcurrentHashMap<>();


    //有新的连接进入时,对该方法进行回调 注入参数的类型:Session、HttpHeaders...
    @BeforeHandshake
    public void handshake(Session session, HttpHeaders headers) {
        System.out.println("新链接: " +
                "" + session.channel().id().asShortText() + "当前在线: " + clients.size());
        //设置协议
        session.setSubprotocols("stomp");
    }

    //当有新的WebSocket连接完成时,对该方法进行回调 注入参数的类型:Session、HttpHeaders...
    @OnOpen
    public void onOpen(Session session, HttpHeaders headers,@RequestParam String uid,@RequestParam String token) {

        synchronized(UserWebSocket.clients){
            log.info(String.format("新用户加入: uid-%s id-%s" ,uid,session.channel().id().asShortText()));
            //是否登录
            if(pxUserService.isLogin(uid,token)){
                //判断是否已连接 已连接则移除
                WsClient client1 = clients.get(uid);
                if(null != client1){
                    client1.getSession().close();
                    clients.remove(uid);
                }
                //登录成功 写入信息
                UserInfo userInfo = pxUserService.getUserInfo(uid);
                WsClient client = new WsClient();
                client.setSession(session);
                client.setUserInfo(userInfo);
                clients.put(uid,client);
                this.uid = uid;
                this.token = token;
            }else {
                //未登录
                sendMsgErr("token错误",session);
                session.close();
            }
        }


    }


    //当有WebSocket连接关闭时,对该方法进行回调 注入参数的类型:Session
    @OnClose
    public void onClose(Session session) throws IOException {
        synchronized(UserWebSocket.clients){
            //清理uid
            removeUser(this.uid);
        }

        System.out.println("链接关闭: " + session.channel().id().asShortText() + "当前在线: " + clients.size());
    }

    //当有WebSocket抛出异常时,对该方法进行回调 注入参数的类型:Session、Throwable
    @OnError
    public void onError(Session session, Throwable throwable) {
        synchronized(UserWebSocket.clients){
            session.close();
            removeUser(this.uid);
        }
        System.out.println("链接异常: " + session.channel().id().asShortText() + "当前在线: " + clients.size());
        throwable.printStackTrace();
    }

    //当接收到字符串消息时,对该方法进行回调 注入参数的类型:Session、String
    @OnMessage
    public void onMessage(Session session, String message)  {
        System.out.println("消息: " + session.channel().id().asShortText() + "当前在线: " + clients.size());
        System.out.println(message);
        WsClient userInfo = clients.get(this.uid);
        if(null != userInfo){
            sendMsgOkAll(userInfo.getUserInfo().getName() + "|" + message);
        }
    }

    //当接收到Netty的事件时,对该方法进行回调 注入参数的类型:Session、Object
    @OnEvent
    public void onEvent(Session session,Object evt) {

        synchronized(UserWebSocket.clients){
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
                switch (idleStateEvent.state()) {
                    case READER_IDLE:
                        System.out.println("read idle");
                        break;
                    case WRITER_IDLE:
                        System.out.println("write idle");
                        break;
                    case ALL_IDLE:
                        System.out.println("all idle");
                        //如果这个会话等于存入的会话则删除  避免无用会话删除了正常的链接
                        WsClient client = clients.get(this.uid);
                        if(null != client && session.equals(client.getSession())){
                            clients.remove(this.uid);
                        }
                        session.close();
                        System.out.println("进入空闲断开链接: " + session.channel().id().asShortText() + "当前在线: " + clients.size());
                        break;
                    default:
                        break;
                }
            }
        }

    }

    /**
     * 成功消息
     * @param ans
     * @param session
     */
    public void sendMsgOk(Object ans,Session session){
        WsServerAns wsServerAns = new WsServerAns(WsResultCode.SUCCESS.getCode(), WsResultCode.SUCCESS.getMsg());
        wsServerAns.setData(ans);
        session.sendText(JSON.toJSONString(ans));
    }

    /**
     * 错误消息
     * @param msg
     * @param session
     */
    public void sendMsgErr(String msg,Session session){
        WsServerAns wsServerAns = new WsServerAns(WsResultCode.ERR.getCode(), msg);

        session.sendText(JSON.toJSONString(wsServerAns));
    }


    /**
     * 广播所有用户
     * @param msg
     */
    public void sendMsgOkAll(String msg){
        WsServerAns wsServerAns = new WsServerAns(WsResultCode.SUCCESS.getCode(), WsResultCode.SUCCESS.getMsg());
        wsServerAns.setData(msg);
        clients.forEach((k,v)->{
            v.getSession().sendText(JSON.toJSONString(wsServerAns));
        });
    }


    public void  removeUser(String uid){
        if(null != uid){
            clients.remove(this.uid);
        }
    }

}





我尝试过 就算所有代码都加锁 也会有死链接的情况 websocket刚接触 对多线程这块很不熟
萋小磊 发表于 2022-11-14 00:19
ppgjx 发表于 2022-11-13 23:51
[mw_shl_code=java,true]@ServerEndpoint(path = "/websocket",port = "${testWebsocketPort}",allIdleTi ...

对 你这个锁有问题,  你 sync 是 当前的这个 map.

再者就是 ConcurrentHashMap 是线程安全的, 他内部的方法加锁了的
你直接删除你的 sync 就行
写起来没那么复杂的
 楼主| ppgjx 发表于 2022-11-14 11:47
萋小磊 发表于 2022-11-14 00:19
对 你这个锁有问题,  你 sync 是 当前的这个 map.

再者就是 ConcurrentHashMap 是线程安全的, 他内部 ...

我没理解你的意思 这个map不是私有的 是全部websocket实例共享的一个 加锁其他的新连接应该是阻塞的吧
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-1-26 08:57

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表