吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 1211|回复: 1
收起左侧

[求助] Jeecg Boot webSocket推送报并发冲突错误怎么加锁

[复制链接]
HK仅輝 发表于 2022-4-18 14:40
本帖最后由 HK仅輝 于 2022-4-18 15:40 编辑

怎么加锁,‘服务器端推送消息pushMessage()’(94行)方法中报的错误

        at org.jeecg.modules.message.websocket.WebSocket.pushMessage(WebSocket.java:96)
        at org.jeecg.modules.message.websocket.WebSocket.onMessage(WebSocket.java:113)
[Java] 纯文本查看 复制代码
package org.jeecg.modules.message.websocket;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.annotation.Resource;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.jeecg.common.base.BaseMap;
import org.jeecg.common.constant.WebsocketConst;
import org.jeecg.common.modules.redis.client.JeecgRedisClient;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;

import lombok.extern.slf4j.Slf4j;

/**
 * [url=home.php?mod=space&uid=686208]@AuThor[/url] scott
 * [url=home.php?mod=space&uid=686237]@date[/url] 2019/11/29 9:41
 * @Description: 此注解相当于设置访问URL
 */
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}") //此注解相当于设置访问URL
public class WebSocket {

    private Session session;

    private String userId;

    private static final String REDIS_TOPIC_NAME = "socketHandler";

    @Resource
    private JeecgRedisClient jeecgRedisClient;

    /**
     * 缓存 webSocket连接到单机服务class中(整体方案支持集群)
     */
    private static CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();
    private static Map<String, Session> sessionPool = new HashMap<String, Session>();


    @OnOpen
    public void onOpen(Session session, @PathParam(value = "userId") String userId) {
        try {
            this.session = session;
            this.userId = userId;
            webSockets.add(this);
            sessionPool.put(userId, session);
            log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
        } catch (Exception e) {
        }
    }

    @OnClose
    public void onClose() {
        try {
            webSockets.remove(this);
            sessionPool.remove(this.userId);
            log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
        } catch (Exception e) {
        }
    }


    /**
     * 服务端推送消息
     *
     * @param userId
     * @param message
     */
    public void pushMessage(String userId, String message) {
        Session session = sessionPool.get(userId);
        if (session != null && session.isOpen()) {
            try {
                log.info("【websocket消息】 单点消息:" + message);
                session.getAsyncRemote().sendText(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 服务器端推送消息
     */
    public void pushMessage(String message) {
        try {
            webSockets.forEach(ws -> ws.session.getAsyncRemote().sendText(message));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @OnMessage
    public void onMessage(String message) {
        //todo 现在有个定时任务刷,应该去掉
        log.debug("【websocket消息】收到客户端消息:" + message);
        JSONObject obj = new JSONObject();
        //业务类型
        obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);
        //消息内容
        obj.put(WebsocketConst.MSG_TXT, "心跳响应");
        for (WebSocket webSocket : webSockets) {
            webSocket.pushMessage(message);
        }
    }

    /**
     * 后台发送消息到redis
     *
     * @param message
     */
    public void sendMessage(String message) {
        log.info("【websocket消息】广播消息:" + message);
        BaseMap baseMap = new BaseMap();
        baseMap.put("userId", "");
        baseMap.put("message", message);
        jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
    }

    /**
     * 此为单点消息
     *
     * @param userId
     * @param message
     */
    public void sendMessage(String userId, String message) {
        BaseMap baseMap = new BaseMap();
        baseMap.put("userId", userId);
        baseMap.put("message", message);
        jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
    }

    /**
     * 此为单点消息(多人)
     *
     * @param userIds
     * @param message
     */
    public void sendMessage(String[] userIds, String message) {
        for (String userId : userIds) {
            sendMessage(userId, message);
        }
    }

}


[Java] 纯文本查看 复制代码
java.lang.IllegalStateException: The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method
        at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.checkState(WsRemoteEndpointImplBase.java:1258)
        at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.textStart(WsRemoteEndpointImplBase.java:1220)
        at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendStringByCompletion(WsRemoteEndpointImplBase.java:210)
        at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendStringByFuture(WsRemoteEndpointImplBase.java:198)
        at org.apache.tomcat.websocket.WsRemoteEndpointAsync.sendText(WsRemoteEndpointAsync.java:53)
        at org.jeecg.modules.message.websocket.WebSocket.lambda$pushMessage$0(WebSocket.java:96)
        at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:895)
        at java.util.concurrent.CopyOnWriteArraySet.forEach(CopyOnWriteArraySet.java:404)
        at org.jeecg.modules.message.websocket.WebSocket.pushMessage(WebSocket.java:96)
        at org.jeecg.modules.message.websocket.WebSocket.onMessage(WebSocket.java:113)
        at sun.reflect.GeneratedMethodAccessor1426.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.tomcat.websocket.pojo.PojoMessageHandlerWholeBase.onMessage(PojoMessageHandlerWholeBase.java:80)
        at org.apache.tomcat.websocket.WsFrameBase.sendMessageText(WsFrameBase.java:415)
        at org.apache.tomcat.websocket.server.WsFrameServer.sendMessageText(WsFrameServer.java:129)
        at org.apache.tomcat.websocket.WsFrameBase.processDataText(WsFrameBase.java:515)
        at org.apache.tomcat.websocket.WsFrameBase.processData(WsFrameBase.java:301)
        at org.apache.tomcat.websocket.WsFrameBase.processInputBuffer(WsFrameBase.java:133)
        at org.apache.tomcat.websocket.server.WsFrameServer.onDataAvailable(WsFrameServer.java:85)
        at org.apache.tomcat.websocket.server.WsFrameServer.doOnDataAvailable(WsFrameServer.java:183)
        at org.apache.tomcat.websocket.server.WsFrameServer.notifyDataAvailable(WsFrameServer.java:162)
        at org.apache.tomcat.websocket.server.WsHttpUpgradeHandler.upgradeDispatch(WsHttpUpgradeHandler.java:156)
        at org.apache.coyote.http11.upgrade.UpgradeProcessorInternal.dispatch(UpgradeProcessorInternal.java:60)
        at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:59)
        at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:887)
        at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1684)
        at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
        at java.lang.Thread.run(Thread.java:748)



发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

/│\云。 发表于 2022-4-18 15:50
webSockets.forEach(ws -> {
                                synchronized(ws.session) {
                                        try {
                                        ws.session.getAsyncRemote().sendText(message)
                                         } catch (Exception e) {
                                                e.printStackTrace();
                                        }
                                }
                                
                        });


这样试看看  不过尽量不这样循环
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-11-25 15:36

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表