本帖最后由 HK仅輝 于 2022-4-18 15:40 编辑
怎么加锁,‘服务器端推送消息pushMessage()’(94行)方法中报的错误
at org.jeecg.modules.message.websocket.WebSocket.pushMessage(WebSocket.java:96)
at org.jeecg.modules.message.websocket.WebSocket.onMessage(WebSocket.java:113)
[Java] 纯文本查看 复制代码 package org.jeecg.modules.message.websocket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.annotation.Resource;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.jeecg.common.base.BaseMap;
import org.jeecg.common.constant.WebsocketConst;
import org.jeecg.common.modules.redis.client.JeecgRedisClient;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
/**
* [url=home.php?mod=space&uid=686208]@AuThor[/url] scott
* [url=home.php?mod=space&uid=686237]@date[/url] 2019/11/29 9:41
* @Description: 此注解相当于设置访问URL
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}") //此注解相当于设置访问URL
public class WebSocket {
private Session session;
private String userId;
private static final String REDIS_TOPIC_NAME = "socketHandler";
@Resource
private JeecgRedisClient jeecgRedisClient;
/**
* 缓存 webSocket连接到单机服务class中(整体方案支持集群)
*/
private static CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();
private static Map<String, Session> sessionPool = new HashMap<String, Session>();
@OnOpen
public void onOpen(Session session, @PathParam(value = "userId") String userId) {
try {
this.session = session;
this.userId = userId;
webSockets.add(this);
sessionPool.put(userId, session);
log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
} catch (Exception e) {
}
}
@OnClose
public void onClose() {
try {
webSockets.remove(this);
sessionPool.remove(this.userId);
log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
} catch (Exception e) {
}
}
/**
* 服务端推送消息
*
* @param userId
* @param message
*/
public void pushMessage(String userId, String message) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
log.info("【websocket消息】 单点消息:" + message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 服务器端推送消息
*/
public void pushMessage(String message) {
try {
webSockets.forEach(ws -> ws.session.getAsyncRemote().sendText(message));
} catch (Exception e) {
e.printStackTrace();
}
}
@OnMessage
public void onMessage(String message) {
//todo 现在有个定时任务刷,应该去掉
log.debug("【websocket消息】收到客户端消息:" + message);
JSONObject obj = new JSONObject();
//业务类型
obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);
//消息内容
obj.put(WebsocketConst.MSG_TXT, "心跳响应");
for (WebSocket webSocket : webSockets) {
webSocket.pushMessage(message);
}
}
/**
* 后台发送消息到redis
*
* @param message
*/
public void sendMessage(String message) {
log.info("【websocket消息】广播消息:" + message);
BaseMap baseMap = new BaseMap();
baseMap.put("userId", "");
baseMap.put("message", message);
jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
}
/**
* 此为单点消息
*
* @param userId
* @param message
*/
public void sendMessage(String userId, String message) {
BaseMap baseMap = new BaseMap();
baseMap.put("userId", userId);
baseMap.put("message", message);
jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
}
/**
* 此为单点消息(多人)
*
* @param userIds
* @param message
*/
public void sendMessage(String[] userIds, String message) {
for (String userId : userIds) {
sendMessage(userId, message);
}
}
}
[Java] 纯文本查看 复制代码 java.lang.IllegalStateException: The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.checkState(WsRemoteEndpointImplBase.java:1258)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.textStart(WsRemoteEndpointImplBase.java:1220)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendStringByCompletion(WsRemoteEndpointImplBase.java:210)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendStringByFuture(WsRemoteEndpointImplBase.java:198)
at org.apache.tomcat.websocket.WsRemoteEndpointAsync.sendText(WsRemoteEndpointAsync.java:53)
at org.jeecg.modules.message.websocket.WebSocket.lambda$pushMessage$0(WebSocket.java:96)
at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:895)
at java.util.concurrent.CopyOnWriteArraySet.forEach(CopyOnWriteArraySet.java:404)
at org.jeecg.modules.message.websocket.WebSocket.pushMessage(WebSocket.java:96)
at org.jeecg.modules.message.websocket.WebSocket.onMessage(WebSocket.java:113)
at sun.reflect.GeneratedMethodAccessor1426.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.tomcat.websocket.pojo.PojoMessageHandlerWholeBase.onMessage(PojoMessageHandlerWholeBase.java:80)
at org.apache.tomcat.websocket.WsFrameBase.sendMessageText(WsFrameBase.java:415)
at org.apache.tomcat.websocket.server.WsFrameServer.sendMessageText(WsFrameServer.java:129)
at org.apache.tomcat.websocket.WsFrameBase.processDataText(WsFrameBase.java:515)
at org.apache.tomcat.websocket.WsFrameBase.processData(WsFrameBase.java:301)
at org.apache.tomcat.websocket.WsFrameBase.processInputBuffer(WsFrameBase.java:133)
at org.apache.tomcat.websocket.server.WsFrameServer.onDataAvailable(WsFrameServer.java:85)
at org.apache.tomcat.websocket.server.WsFrameServer.doOnDataAvailable(WsFrameServer.java:183)
at org.apache.tomcat.websocket.server.WsFrameServer.notifyDataAvailable(WsFrameServer.java:162)
at org.apache.tomcat.websocket.server.WsHttpUpgradeHandler.upgradeDispatch(WsHttpUpgradeHandler.java:156)
at org.apache.coyote.http11.upgrade.UpgradeProcessorInternal.dispatch(UpgradeProcessorInternal.java:60)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:59)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:887)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1684)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
|