Naylor 发表于 2023-1-28 17:00

WPF基于WebSocket的实时消息传递设计



# 概述


web管理系统中可以对业务数据执行新增和删除,现在需要当业务数据发生新增或删除操作后,尽可能实时的反应到WPF客户端上面。

web管理系统用VUE编写,后端服务为SpringBoot,WPF客户端基于.Netframework4.8编写。

# 整体架构

[!(https://s1.ax1x.com/2023/01/19/pS8ty1U.png)](https://imgse.com/i/pS8ty1U)



# 设计

## 流程设计
* 用户在浏览器界面执行新增业务数据操作,调用后端新增接口
* WPF客户端在启动的时候初始化websocket客户端,并创建对server的监听
* 后端新增接口先将数据落库,而后调用websocket服务端产生消息,消息在产生后立马被发送到了正在监听中的websocket-client
* websocket-server和websocket-client是一对多的关系,如何保证业务数据被正确的分发?监听的时候给server端传递一个全局唯一的clientId,业务数据在产生的时候关联到一个BizId上面,只要保证clientId=BizId就可以了。
* 删除流程和新增类似


## 程序设计

### WebSocketServer
#### 概述

WebSocketServer端采用SpringBoot框架实现,通过在springboot-web项目中集成 org.springframework.boot:spring-boot-starter-websocket
实现websocket的能力。


#### 新增pom

~~~


<!--  websocket  -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>


~~~

#### 新增配置类

~~~


import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.web.socket.config.annotation.EnableWebSocket;

import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
@EnableWebSocket
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}



~~~

#### 创建websocket端点

~~~


import com.alibaba.fastjson.JSON;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

@ServerEndpoint("/ws/label/{clientId}")
@Component
public class LabelWebSocket {
    /**
     * session list
     */
    private static ConcurrentHashMap<String, Session> sessionList = new ConcurrentHashMap<>();
    /**
     * 当前 clientId
     */
    private String currentClientId = "";
    @OnOpen
    public void open(Session session, @PathParam("clientId") String clientId) throws IOException {
        if (sessionList.containsKey(clientId)) {
            sessionList.remove(clientId);
        }
        sessionList.put(clientId, session);
        currentClientId = clientId;
        this.sendMsg(session, "connectok");
    }
    @OnClose
    public void close(Session session) throws IOException {
        sessionList.remove(currentClientId);
        System.out.println("连接关闭,session=" + JSON.toJSONString(session.getId()));
    }
    @OnMessage
    public void receiveMsg(Session session, String msg) throws IOException {
        this.sendMsg(session, "接收到的消息为:" + msg);
//        throw new RuntimeException("主动抛异常");
    }
    @OnError
    public void error(Session session, Throwable e) throws IOException {
        System.out.println("连接异常,session=" + JSON.toJSONString(session.getId()) + ";currentClientId=" + currentClientId);
        this.sendMsg(session, "发生异常,e=" + e.getMessage());
        e.printStackTrace();
    }
    /**
     * @param clientId
     * @param msg
     */
    public boolean sendMsg(String clientId, String msg) throws IOException {
        if (sessionList.containsKey(clientId)) {
            Session session = sessionList.get(clientId);
            this.sendMsg(session, msg);
            return true;
        } else {
            return false;
        }
    }
    private void sendMsg(Session session, String msg) throws IOException {
        session.getBasicRemote().sendText(msg);
    }
}



~~~

### WebSocketClient

#### 概述
WebSocketClient端集成在WPF应用客户端中,通过前期调研,选中 WebSocketSharp作为websocketclient工具,WebSocketSharp 是托管在Github的开源项目,MITLicense,目前4.9K的star。

#### 安装WebSocketSharp

~~~

//nuget

Install-Package WebSocketSharp -Pre

~~~

#### 初始化client

~~~

WebSocket ws = new WebSocket("ws://127.0.0.1:8083/ws/xx/clientId");


~~~

#### 创建连接

~~~


private void InitWebSocket()
{
    ws.OnOpen += (sender, e) =>
    {
        Console.WriteLine("onOpen");
    };
    //允许ping
    ws.EmitOnPing = true;
    //接收到xiaoxi
    ws.OnMessage += (sender, e) =>
    {
        ReceiveMessage(sender, e);
    };
    ws.Connect();
    //发送消息
    //ws.Send("BALUS")
    ;
}
private void ReceiveMessage(object sender, MessageEventArgs e)
{
    if (e.IsText)
    {
        // Do something with e.Data.like jsonstring
        Console.WriteLine(e.Data);
        return;
    }
    if (e.IsBinary)
    {
        // Do something with e.RawData. like  byte[]
        return;
    }
    if (e.IsPing)
    {
        // Do something to notify that a ping has been received.
        return;
    }
}



~~~

#### 跨线程更新UI

由于 WebSocketSharp 会创建线程来处理 ReceiveMessage ,而WPF中子线程是无法更新UI的,所以需要引入 Dispatcher 来实现跨线程更新UI。

##### 获取当前线程名字

~~~

 //当前线程

string name = Thread.CurrentThread.ManagedThreadId.ToString();


~~~

##### 示例代码

~~~


private void ReceiveMessage(object sender, MessageEventArgs e)
{
    if (e.IsText)
    {
        // Do something with e.Data.like jsonstring
        Console.WriteLine(e.Data);
        //当前线程
        string name = Thread.CurrentThread.ManagedThreadId.ToString();
        App.Current.Dispatcher.Invoke((Action)(() =>
        {
            Image lab = new Image();
            lab.Uid = "123456";
            lab.Name = "labName";
            lab.Width = 50; lab.Height = 50;
            string url = "http://xxx:xxx/img/louyu.png";
            BitmapImage bitmapImage = HttpUtil.getImage(url);
            lab.Source = bitmapImage;
            lab.AddHandler(MouseLeftButtonDownEvent, new MouseButtonEventHandler(LabelClick));
            Canvas.SetTop(lab, 800);
            Canvas.SetLeft(lab, 600);
            this.cav.Children.Add(lab);
        }));
        return;
    }
}



~~~


#### 重连机制

当websocket-server重启,此时client将丢失和server的连接,如果不引入重连机制,那么在手动重启client前,业务数据将无法实时更新,这在实际使用中肯定是不被允许的,所以需要引入重连机制。当捕捉到连接关闭之后,尝试重新连接。

重试次数5次,重试间隔5s,示例代码如下:

~~~

//连接关闭
wsLabel.OnClose += (sender, e) =>
{
    OnWsLabelClose(sender, e);
};


......


int retry = 0;
/// <summary>
/// 连接关闭
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void OnWsLabelClose(object sender, CloseEventArgs e)
{
    Log.LogInfo.Info("OnWsLabelClose,retry=" + retry);
    if (retry < 5)
    {
        retry++;
        Thread.Sleep(5000);
        wsLabel.Connect();
    }
    else
    {
        wsLabel.Log.Error("The reconnecting has failed.");
        Log.LogInfo.Info("重新连接失败,超过最大重试次数");
    }
}


      
~~~


## 接口设计

### 新增接口

#### 概述

目前WebSocketServer和web后端服务是在同一个SpringBoot的工程中,所以只要将WebSocketServer托管到SpringContainer中,web后端服务可以通过 DI 的方式直接访问 WebSocketEndPoint。

如果考虑程序的低耦合,可以在WebSocketServer和web后端服务之间架设一个MQ。


#### 核心代码
~~~

    @Autowired
    private LabelWebSocket ws;
    @GetMapping("/create")
    public boolean createLabel() throws IOException {
        String cameraId = "cml";
        //todo
        boolean result = ws.sendMsg(cameraId, "新增标签");
        return result;
    }


~~~





# 风险

## 分布式风险

当前在 WebSocketServer 中,已经连接的client信息是记录在当前进程的cache中,如果服务做横向扩容,cache信息无法在多实例进程中传递,将导致无法正确的处理业务数据,并可能会发生意想不到的异常和bug,此问题在并发越高的情况下造成的影响越大

## 资源风险

web后端服务为基于java语言的springboot程序,这种类型程序的特点是内存消耗特别严重。WebSocketServer服务在本项目中仅用作消息中间件,连通web后端服务和WPF客户端。

首先WebSocketServer没有太多的计算能力的消耗,内存消耗会随着连接客户端数量的增长而增长,网络将是最大的开销,一方面需要转发来自web后端服务的业务数据,并和WPF客户端保持长连接;另一方面WebSocketServer和WPF客户端的交互可能会走公网,而其和web后端服务必然是在局域网环境。

综上,将web后端服务和WebSocketServer分开部署对于硬件资源成本和利用率来说是最好的选择。


## 高可用风险

未引入重试机制,当某一个环节失败之后,将导致异常情况发生。

奈何心善 发表于 2023-1-28 18:47

高大上,最然看不懂,也得来学学。

ab123 发表于 2023-1-29 09:04

写的很清晰,学习一下web和客户端的同步思路,之前没做过客户端,wpf看着有点吃力。

Naylor 发表于 2023-1-29 10:27

ab123 发表于 2023-1-29 09:04
写的很清晰,学习一下web和客户端的同步思路,之前没做过客户端,wpf看着有点吃力。

WPF还是很好入门的,只要有java或者c#的基础,wpf一周就可以上手了
页: [1]
查看完整版本: WPF基于WebSocket的实时消息传递设计