吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 673|回复: 1
收起左侧

[求助] java websocket线程安全问题

[复制链接]
ppgjx 发表于 2022-11-12 20:53
[Java] 纯文本查看 复制代码
@ServerEndpoint(path = "/websocket",port = "${testWebsocketPort}",allIdleTimeSeconds = "10")
@Slf4j
public class UserWebSocket{



    //redis
    private static  StringRedisTemplate redisTemplate;

    @Autowired
    public  void setRedisTemplate(StringRedisTemplate redisTemplate) {
        UserWebSocket.redisTemplate = redisTemplate;
    }
    @Autowired
    public  void setPxUserService(PxUserService pxUserService) {
        UserWebSocket.pxUserService = pxUserService;
    }


    //用户服务类
    private static  PxUserService pxUserService;


    //保存所有客户连接
    public static final Map<String, WsClient> clients = new ConcurrentHashMap<>();


    //有新的连接进入时,对该方法进行回调 注入参数的类型:Session、HttpHeaders...
    @BeforeHandshake
    public void handshake(Session session, HttpHeaders headers) {
        System.out.println("新链接: " +
                "" + session.channel().id().asShortText() + "当前在线: " + clients.size());
        //设置协议
        session.setSubprotocols("stomp");
    }

    //当有新的WebSocket连接完成时,对该方法进行回调 注入参数的类型:Session、HttpHeaders...
    @OnOpen
    public void onOpen(Session session, HttpHeaders headers,@RequestParam String uid,@RequestParam String token) {

        log.info(String.format("新用户加入: uid-%s id-%s" ,uid,session.channel().id().asShortText()));
        //是否登录
        if(pxUserService.isLogin(uid,token)){
            //判断是否已连接 已连接则移除
            WsClient client1 = clients.get(uid);
            if(null != client1){
                client1.getSession().close();
                clients.remove(uid);
            }
            //登录成功 写入信息
            UserInfo userInfo = pxUserService.getUserInfo(uid);
            WsClient client = new WsClient();
            client.setSession(session);
            client.setUserInfo(userInfo);
            clients.put(uid,client);
        }else {
            //未登录
            sendMsgErr("token错误",session);
            session.close();
        }

    }


    //当有WebSocket连接关闭时,对该方法进行回调 注入参数的类型:Session
    @OnClose
    public void onClose(Session session) throws IOException {
        //清理uid
        clients.forEach((k,y)->{
            if(y.getSession().equals(session)){
                clients.remove(k);
            }
        });
        System.out.println("链接关闭: " + session.channel().id().asShortText() + "当前在线: " + clients.size());
    }

    //当有WebSocket抛出异常时,对该方法进行回调 注入参数的类型:Session、Throwable
    @OnError
    public void onError(Session session, Throwable throwable) {
        //清理uid
        clients.forEach((k,y)->{
            if(y.getSession().equals(session)){
                y.getSession().close();
                clients.remove(k);
            }
        });
        System.out.println("链接异常: " + session.channel().id().asShortText() + "当前在线: " + clients.size());
        throwable.printStackTrace();
    }

    //当接收到字符串消息时,对该方法进行回调 注入参数的类型:Session、String
    @OnMessage
    public void onMessage(Session session, String message) throws Exception {
        System.out.println("消息: " + session.channel().id().asShortText() + "当前在线: " + clients.size());
        System.out.println(message);
        WsClient userInfo = getUserInfo(session);
        if(null != userInfo){
            sendMsgOkAll(userInfo.getUserInfo().getUserId() + "|" + message);
        }
    }

    //当接收到二进制消息时,对该方法进行回调 注入参数的类型:Session、byte[]
    @OnBinary
    public void onBinary(Session session, byte[] bytes) {
        for (byte b : bytes) {
            System.out.println(b);
        }
        session.sendBinary(bytes);
    }

    //当接收到Netty的事件时,对该方法进行回调 注入参数的类型:Session、Object
    @OnEvent
    public void onEvent(Session session, Object evt) {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            switch (idleStateEvent.state()) {
                case READER_IDLE:
                    System.out.println("read idle");
                    break;
                case WRITER_IDLE:
                    System.out.println("write idle");
                    break;
                case ALL_IDLE:
                    System.out.println("all idle");
//                    clients.forEach((k,y)->{
//                        if(y.getSession().equals(session)){
//                            y.getSession().close();
//                            clients.remove(k);
//                        }
//                    });
//                    System.out.println("进入空闲断开链接: " + session.channel().id().asShortText() + "当前在线: " + clients.size());
                    break;
                default:
                    break;
            }
        }
    }


    /**
     * 成功消息
     * @param ans
     * @param session
     */
    public void sendMsgOk(Object ans,Session session){
        WsServerAns wsServerAns = new WsServerAns(WsResultCode.SUCCESS.getCode(), WsResultCode.SUCCESS.getMsg());
        wsServerAns.setData(ans);
        session.sendText(JSON.toJSONString(ans));
    }

    /**
     * 错误消息
     * @param msg
     * @param session
     */
    public void sendMsgErr(String msg,Session session){
        WsServerAns wsServerAns = new WsServerAns(WsResultCode.ERR.getCode(), msg);

        session.sendText(JSON.toJSONString(wsServerAns));
    }


    /**
     * 广播所有用户
     * @param msg
     */
    public void sendMsgOkAll(String msg){
        WsServerAns wsServerAns = new WsServerAns(WsResultCode.SUCCESS.getCode(), WsResultCode.SUCCESS.getMsg());
        wsServerAns.setData(msg);
        clients.forEach((k,y)->{
            y.getSession().sendText(JSON.toJSONString(wsServerAns));
        });

    }

    /**
     * 获取用户的信息
     * @param session
     * @return
     */
    public WsClient getUserInfo(Session session){
        Iterator<WsClient> iterator = clients.values().iterator();
        while (iterator.hasNext()){
            WsClient next = iterator.next();
            if(next.getSession().equals(session)){
                return next;
            }
        }
        return null;
    }

}






大佬们 最近在研究websocket 用户进入app后进行websocket的链接 然后写了这段登录代码 有什么什么地方可以优化呢? 或者哪里有bug或者是线程安全问题 对多线程不是很熟悉

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

leedyx1990 发表于 2022-11-12 23:54
服务端的多线程已经由框架处理了,只要注意公共字段的线程安全性就好了
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-1-12 03:56

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表