RocketMq】 RocketMq 4.9.4 Windows-docker 部署
# 引言
注意个人使用了**4.9.4**的Rocketmq版本进行学习部署使用。因为windows上使用docker部署不同版本的RockerMq可能会有不同的情况,这里仅保证4.9.4的版本可以正确运行。
> Windows 配置上要比Linux 配置麻烦一些,尤其是复制路径需要把反斜杠换成斜杠。
# 个人环境
- CPU:12代英特尔12700
- 内存 32G
- 操作系统:Win11 21H(12代英特尔大小核优化,没办法)
- RocketMq:4.9.4 版本
- Docker:官网当时能下到的最新版
- 镜像:官方提供的4.9.4的打包镜像,非用户二次打包
# 基础配置
## WSL安装
Docker在Win11里面存在一些其他问题,一上来启动的时候Docker会**要求安装WSL**。
1. 安装Docker这一步直接去官网找最新版本下载安装即可,这里略过截图了。
2. 个人使用的Win11,发现报错需要升级WSL,如果没有的话可以忽略,如果出现类似报错的话,看如下链接:[旧版 WSL 的手动安装步骤 | Microsoft Learn](https://learn.microsoft.com/zh-cn/windows/wsl/install-manual#step-4---download-the-linux-kernel-update-package)
3. 通过上面的链接修复WSL的版本问题之后,重启Docker,可以正常使用Docker了。
![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221108141433.png)
> 介绍:[安装 WSL | Microsoft Learn](https://learn.microsoft.com/zh-cn/windows/wsl/install)
> 本指南将介绍如何使用适用于 Linux 的 Windows 子系统安装 Linux 发行版(例如 Ubuntu、OpenSUSE、Kali、Debian、Arch Linux 等)。 通过 WSL,你可使用与 Windows 工具(如 PowerShell 或 Visual Studio Code)完全集成的 Linux 工具(如 Bash 或 Grep),而无需双启动。
这里简单描述个人的修复步骤:
使用管理员运行CMD,然后执行下面的命令,如果提示如图说明操作成功
```
dism.exe /online /enable-feature /featurename:VirtualMachinePlatform /all /norestart
Deployment Image Servicing and Management tool
Version: 10.0.22000.653
Image Version: 10.0.22000.1098
Enabling feature(s)
[==========================100.0%==========================]
The operation completed successfully.
```
下载Linux内核包
![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221108141531.png)
将 WSL 2 设置为默认版本
```
wsl --set-default-version 2
```
这样处理之后Windows Docker 就可以正常启动了,可以看到比Linux和Mac都要麻烦不少。
4. 拉取Docker的Rocketmq镜像之前需要在自己定义的目录执行下面的命令提前确定好RocketMq的日志以及持久化文件存储位置,因为这里借用了**Git Bash**的命令行工具,所以命令使用的是Linux的相关命令(比较建议使用这个小技巧)。
```powershell
# 所在目录 /d/adongstack/run/docker/rocketmq
$ mkdir -p./data/namesrv/logs~
# 所在目录 /d/adongstack/run/docker/rocketmq
$ mkdir -p ./data/broker/conf
# 所在目录/d/adongstack/run/docker/rocketmq
$ mkdir -p ./data/broker/logs
# 所在目录 /d/adongstack/run/docker/rocketmq
$ mkdir -p ./data/broker/store
```
> 上方奇怪的路径显示来自:powershell
5. 此外在正式拉取镜像之前也可以先把Docker的拉取镜像仓库换一下,这里个人第一次拉取的速度比较快所以没有换。
![[国内镜像仓库.png]]
注意JSON的格式,如果图方便把下面这一段代码完全复制然后贴进去即可,或者只**追加**"registry-mirrors" 部分。
```json
{
"builder": {
"gc": {
"defaultKeepStorage": "20GB",
"enabled": true
}
},
"experimental": false,
"features": {
"buildkit": true
},
// 下面为追加内容(本行复制后请删除)
"registry-mirrors": [
"https://hub-mirror.c.163.com",
"https://mirror.baidubce.com"
]
}
```
## Docker Namesrv 配置
1. 一切准备工作做好,我们开始拉取docker镜像,然后进行启动测试,配置使用默认的即可,下面拉取rocketmq 4.9.4 的镜像命令:
```shell
$ docker pull apache/rocketmq:4.9.4
4.9.4: Pulling from apache/rocketmq
.... 省略一些内容
Status: Downloaded newer image for apache/rocketmq:4.9.4
# 下面的结果说明拉取成功了
docker.io/apache/rocketmq:4.9.4
```
如果结尾出现类似的信息说明拉取成功。下面开始运行镜像,Docker Window 提供可视化的配置界面,但是我觉得命令的方式快很多,这里仅仅截图展示一下。
![[默认启动docker配置.png]]
下面介绍命令的方式启动关键组件。
2. 准备namesrv的数据存储和日志存储地址。
```
D:/adongstack/run/docker/rocketmq/data/namesrv/logs
D:/adongstack/run/docker/rocketmq/data/namesrv/stores
```
然后准备构建**namesrv**的命令,注意修改两个-V 的地址:
> 网上有很多日志在`/root/logs`下面的,个人实际使用的情况和此情况不符合
```d
docker run -d \
--restart=always \
--name rmqnamesrv \
-p 9876:9876 \
-v D:/adongstack/run/docker/rocketmq/data/namesrv/logs:/home/rocketmq/logs \
-v D:/adongstack/run/docker/rocketmq/data/namesrv/stores:/home/rocketmq/store \
-e "MAX_POSSIBLE_HEAP=100000000" \
apache/rocketmq:4.9.4 \
sh mqnamesrv
```
> 注意映射原始地址,一个是 logs,另一个是 store,为了确认是否映射正确,可以进入`docker exec -it '镜像地址'`的方式进入到镜像的根目录,然后一层层网上找到对应的原始地址(此时也可以发现Store 目录也是空的)
> ![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221109072158.png)
相关的参数作用如下:
| 参数 | 作用 |
| -------------------------------- | ------------------------------------------------------------ |
| -d | 守护进程启动 |
| --restart=always | docker重启时候容器自动重启 |
| \--name rmqnamesrv | 把容器的名字设置为rmqnamesrv |
| -p 9876:9876 | 端口号配置,格式:容器的端口号:宿主机的端口号 |
| -v xxxx/namesrv/logs:/home/rocketmq/logs| 把容器内的/home/rocketmq/logs日志目录挂载到宿主机的自定义路径目录(注意win需要带盘符) |
| -v xxxx/namesrc/store:/home/rocketmq/store | 把容器内的/home/rocketmq/store 数据存储目录挂载到宿主机的自定义路径目录(注意win需要带盘符) |
| rmqnamesrv | 容器的名字 |
| -e “MAX_POSSIBLE_HEAP=100000000” | 该容器的最大堆内存为100000000,基本为无限大 |
| apache/rocketmq:4.9.4 | 使用的镜像名称,比如这里使用了官方4.9.4的镜像 |
| sh mqnamesrv | 启动namesrv服务 |
运行之后通常会返回一个唯一ID比如`$ 53d8bdda518f1a4ed24a45c7c66a463413933d04ee7b6e5ac7a9150dea54ca48`,之后可以立即检查本地映射的目录是否存在对应的日志文件,注意因为此时刚刚启动store目录的内容是空的。
![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221109071410.png)
## Docker Broker 配置
1. 同理首先确定broker的配置文件以及日志的存储位置,还有数据文件的存储位置,个人的配置路径如下:
```d
D:/adongstack/run/docker/rocketmq/data/broker/logs
D:/adongstack/run/docker/rocketmq/data/broker/conf
D:/adongstack/run/docker/rocketmq/data/broker/stores
```
2. 配置broker的核心配置`broker.conf`,这个配置的内容摘自官方源码的文件,底部新增了自定义的配置。
```java
# mq集群名称,注意这里进行了更改
brokerClusterName = YYDSCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-master
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 00
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 72
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
#设置broker节点所在服务器的ip地址(公网IP),win系统下,用ipconfig查一下你的主机ip
brokerIP1 = 192.168.0.107
# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
#补充
# 磁盘使用达到95%之后,生产者再写入消息会报错 CODE: 14 DESC: service not available now, maybe disk full diskMaxUsedSpaceRatio=95
```
3. 把配置文件放到最开始定义的路径`D:/adongstack/run/docker/rocketmq/data/broker/conf`当中。
> 提醒 NameServer启动之后的RocketMq根路径为:`/home/rocketmq/rocketmq-4.9.4/con`,Broker同理。
接着是构建Broker,个人起初参考了网上的博客文章,结果发现构建出来均出现找不到conf的问题。
```java
.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at java.io.FileInputStream.<init>(FileInputStream.java:93)
at org.apache.rocketmq.broker.BrokerStartup.createBrokerController(BrokerStartup.java:119)
at org.apache.rocketmq.broker.BrokerStartup.main(BrokerStartup.java:57)
```
毫无疑问是个人尝试的命令出错了,在经过反复尝试之后修复,注意rocketmq的home目录为: `/home/rocketmq/rocketmq-4.9.4`
```d
docker run -d\
--restart=always \
--name rmqbroker \
--link rmqnamesrv:namesrv \
-p 10911:10911 \
-p 10909:10909 \
-v D:/adongstack/run/docker/rocketmq/data/broker/log:/home/rocketmq/logs \
-v D:/adongstack/run/docker/rocketmq/data/broker/store:/home/rocketmq/store \
-v D:/adongstack/run/docker/rocketmq/data/broker/conf/broker.conf:/home/rocketmq/rocketmq-4.9.4/conf/broker.conf \
-e "NAMESRV_ADDR=namesrv:9876" \
-e "MAX_POSSIBLE_HEAP=200000000" \
apache/rocketmq:4.9.4 \
sh mqbroker -c ../conf/broker.conf
```
这里有一个踩坑点是最后一条命令`sh mqbroker -c ../conf/broker.conf`,个人尝试替换为`/home/rocketmq/rocketmq-4.9.4/conf/broker.conf`是**无效并且报错**的,此外这里需要特别注意一定要自定义`broker.conf`启动,否则默认情况下broker默认走Docker内网会导致我们的访问变成外部访问而**访问不到**。
不管是Linux还是Windows,都需要确保启动rocketmq的用户是具备目录操作权限的。windows通常没这个烦恼,但如果是Linux则**本地 logs 目录一定要是 777 权限**。
参数配置的作用如下:
| 参数 | 作用 |
| ------------------------------------------------------------ | ------------------------------------------------------------ |
| -d | 守护进程的方式启动 |
| --restart=always | docker重启时候镜像自动重启 |
| --name rmqbroker | 容器的名字设置为rmqbroker |
| -link rmqnamesrv:namesrv | 和rmqnamesrv容器通信,rmqnamesrv代表本身的link名称,namesrv 表示 |
| -p 10911:10911 | 把容器的非vip通道端口挂载到宿主机 |
| -p 10909:10909 | 把容器的vip通道端口挂载到宿主机 |
| -e “NAMESRV_ADDR=namesrv:9876” | 指定namesrv的地址为本机namesrv的ip地址:9876 |
| -e “MAX_POSSIBLE_HEAP=200000000” rocketmqinc/rocketmq sh mqbroker | 指定broker服务的最大堆内存,本次指定为200000000 |
| apache/rocketmq:4.9.4 | 使用的镜像名称 |
| sh mqbroker -c ../broker.conf | 指定配置文件启动broker节点 |
启动完成之后,第一时间检查一下本地的映射目录和对应的容器内部文件是否正确进行映射。
!(https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221109075008.png)
!(https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221109075132.png)
!(https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221109075106.png)
# Docker Dashboard 配置
在当前的版本中,可视化界面被更名为**dashboard**并且被移到了一个新项目当中,具体可以看下面的链接:
(https://github.com/apache/rocketmq-dashboard)
> **Notice**: Console has renamed to dashboard and transfered the new repo, it will graduate in the near future, and welcome you to fill in the user due diligence.
这里按照文档的`quick start`进行拉取:
```d
docker pull apacherocketmq/rocketmq-dashboard:latest
```
注意提示的最低版本以及配置要求:
```d
#### Prerequisite
1.64bit OS, Linux/Unix/Mac is recommended;
2.64bit JDK 1.8+;
3.Maven 3.2.x;
```
拉取之后Docker的内容如下:
![]
接着我们使用下面的命令启动可视化界面,注意要改成**宿主机 Ip 和 端口号**(也就是自己的Windows电脑联网对应的IP,以太网或者Wifi):
```d
docker run -d \
--name rocketmq-dashboard \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.0.107:9876" \
-p 18080:8080 \
-t apacherocketmq/rocketmq-dashboard:latest
```
启动之后,访问地址`http://localhost:18080(这里修改了宿主机的映射端口18080)。进入主页之后的内容如下:
![]
到此就可以开心愉快的玩耍了。
# 踩坑点
虽然整个配置过程看起来很简单,但是如果不注意细节的话实际上有不少的踩坑点,这里一一叙述:
## 映射路径问题
网上映射的路径千奇百怪的,这里拿官方镜像映射发现路径如下:
```
/home/rocketmq/logs
/home/rocketmq/store
/home/rocketmq/rocketmq-4.9.4/conf/broker.conf
```
需要注意这些路径一个字母都不能错,否则启动是不会报错的,但是会出现你在映射路径的修改无法影响到整个容器的配置。
## Producer查询问题
老生常谈的问题,解决办法是在自己**本地测试**的时候不要调用`producer.shutdown()`方法(线上不能这么干),因为这个方法会在发送完成之后直接删除生产者组。
![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221109215534.png)
> 另外我想吐槽一下这个提示的用户体验是认真的么,带鱼屏看这种页面怕不是脖子都要甩断了(虽然我不是)。
> 此外这一堆报错看着也比较瘆得慌。
## DashBoard 端口问题
dashboard用的是SpringBoot默认的8080端口,容易和自己的项目冲突,所以建议启动容器的时候映射到18080,可以避免端口占用的烦恼。
# 实验
下面简单做一个测试,为了追求快速,这里把快速拉取一个SpringBoot、pom.xml引用等过程跳过了,直接上代码。
> 注意 Broker 启动的时候配置文件指定了自动创建topic以及自动创建订阅组,所以不需要再用RocketMq的命令去创建队列,当然是自己学习的时候可以这样干,**生产是不能开启的**,切记!!!
> autoCreateSubscriptionGroup=true
autoCreateTopicEnable=true
```java
public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("test");
// 自己的本机IP
producer.setNamesrvAddr(Const.NAMESRV_ADDR_SINGLE);
producer.start();
for(int i = 0 ; i <1; i ++) {
// 1. 创建消息
Message message = new Message("test",// 主题
"TagA", // 标签
"key" + i, // 用户自定义的key ,唯一的标识
("Hello RocketMQ" + i).getBytes()); // 消息内容实体(byte[])
SendResult sr = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer queueNumber = (Integer)arg;
return mqs.get(queueNumber);
}
}, 2);
System.err.println(sr);
// 自己做实验需要关掉,否则会自动删除生产者组
// producer.shutdown();
}
```
最后到Producer去查询,成功看到内容:
![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221109222336.png)
# 小结
下面是个人在另一台Windows电脑上的执行成功的脚本:
```d
# 启动 NameServ
docker run -d \
--restart=always \
--name rmqnamesrv \
-p 9876:9876 \
-v E:/adongstack/run/docker/rocketmq/data/namesrv/logs:/home/rocketmq/logs \
-v E:/adongstack/run/docker/rocketmq/data/namesrv/store:/home/rocketmq/store \
-e "MAX_POSSIBLE_HEAP=100000000" \
apache/rocketmq:4.9.4 \
sh mqnamesrv
# 启动 Broker
docker run -d\
--restart=always \
--name rmqbroker \
--link rmqnamesrv:namesrv \
-p 10911:10911 \
-p 10909:10909 \
-v E:/adongstack/run/docker/rocketmq/data/broker/logs:/home/rocketmq/logs \
-v E:/adongstack/run/docker/rocketmq/data/broker/store:/home/rocketmq/store \
-v E:/adongstack/run/docker/rocketmq/data/broker/conf/broker.conf:/home/rocketmq/rocketmq-4.9.4/conf/broker.conf \
-e "NAMESRV_ADDR=namesrv:9876" \
-e "MAX_POSSIBLE_HEAP=200000000" \
apache/rocketmq:4.9.4 \
sh mqbroker -c ../conf/broker.conf
# docker-dashboard 启动命令
docker run -d
--name rocketmq-dashboard
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=本机电脑IP:9876" -p 18080:8080 -t apacherocketmq/rocketmq-dashboard:latest
```
# 总结
总得来说需要格外小心Broker的配置,一定要自定义配置并且定义宿主机的IP,建议有条件使用Linux部署docker和使用Docker。 # 【RocketMq】RocketMq 高版本JDK编译报错问题处理
# 引言
简单记录RocketMq的JDK8以上版本的编译问题,在RocketMq的github - issue里面讨论还挺多的。
> 总得来说是个小问题,但是居然没啥文章介绍过,难道都是JDK8去部署RocketMq的源码的么?
# 报错问题
因为IDEA缓存的存在,很有可能看到这些内容不是爆红而是正常导入的,此时编译却会诡异般的报错。
- java: 程序包sun.nio.ch不存在
- sun.util.locale.BaseLocale.SEP不存在
**Idea的报错情况**
在Idea 2021.1 的版本中会出现如下问题,这个报错第一眼看着挺懵逼的:
![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221108162226.png)
Idea 2022.1.3 版本就好很多,报错是人话了:
```d
java: package sun.util.locale is not visible
(package sun.util.locale is declared in module java.base, which does not export it)
```
# 排查
如果对于Java9的模块化有一丁点概念,基本这个问题都能迎刃而解,但是如果像我一样不懂的话,大概会在这个问题上卡上一段时间,并且谷歌也查不出直接关联的情况,不过查类似报错会跟你讲模块化的事情。
# 思考
关键的灵感来自于下面这篇文章,个人按照文章的思路,先把整个项目从maven到项目整个配置使用的统一JDK版本,所以索性全换成JDK11尝试。
(https://stackoverflow.com/questions/46280859/intellij-idea-error-java-invalid-source-release-1-9)
> 此外还参阅了文章: [(1) java9迁移注意事项_个人文章 - SegmentFault 思否](https://segmentfault.com/a/1190000013398709) ,这篇文章中介绍了迁移到高版本JDK之前建议先到jdk.1.9 转为模块化之后再往更高版本升级。
> 但从IDEA的默认下载的JDK在新版本中只有9、11 - 17 这几个大版本了,所以就没有尝试了。个人也建议先到JDK9的模块化先弄一遍再去升级。
先把整个**Compiler**项目的版本换成JDK 11,这个页面比较重要后续还会在用到:
![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221108163606.png)
在RocketMq的`pom.xml`当中把编译版本改为**11**。
![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221108163705.png)
保险期间,利用 `ctrl + shift + F`的快捷键把所有有可能指定的地方全改了。
![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221108163929.png)
最后就是项目本身的版本了:
![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221108165129.png)
![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221108165344.png)
![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221108165353.png)
这样一通配置之后,发现还是会报同样的错,所以可以确定是模块化的问题,但是问题是我在**编译的时候如何加参数**?
这里不再讨论JDK1.9 的模块化细节(足够新写几篇文章介绍),这里介绍解决办法,介绍Idea如何配置编译参数。
# Idea 如何添加编译参数?
Setting -> Build, Execution -> Java Compiler
最下方有关键词 **compiler paramter**:
![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221108162919.png)
> 这里吐槽一下Idea的反智操作逻辑:修改完参数之后,你先去别的可输入框先点一下,然后**Aplly按钮会亮起来告诉你有改动过**,建议先确定能否Apply,先Apply再点OK。否则容易改了参数一个OK最后发现**修改无效**。
**store** 子项目编译参数设置为:
```
--add-exports java.base/sun.nio.ch=ALL-UNNAMED
```
**test** 子项目的编译参数设置为:
```
--add-exports java.base/sun.util.locale=ALL-UNNAMED
```
加入之后按IDEA 右上角的小锤子编译一下,这个小问题圆满解决。
# 小结
长期JDK8选手,工作也不不允许用JDK8更高的版本,外加外部设施对于高JDK版本支持度不够,比如JDK11的**Jenkins**就有问题。
真实照应了[]里面的原文评论:
> 1. From my experience, the biggest issue is not the code itself, but the **surrounding infrastructure**(周边设施). For example replacing **Jenkins plugins that don't support Java 11 was painful.**
> 2. The biggest issue in my experience migrating java apps from version 8 it's related with java time precission:
真的只能期待一些周围开源组件强制要求用户使用高版本JDK才有可能推进了。
# 写到最后
不能当JDK8忠实粉丝,还是得**与时俱进,与时俱进**.....见到熟悉的提醒界面非常安心。
![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221108164006.png) # #rocketmq【RocketMq-生产者】消息发送者参数详解
# 引言
首先注意本次讨论的RokcetMq源码版本为 **4.9.4**,距离5.0发布 的没有多久。
这一节针对RocketMq的生产者请求发送的部分细节进行阐述,主要包含了下面的内容:DefaultMQProducer 为生产者默认对象,这个对象继承自 ClientConfig,里面包含了请求者的通用配置,所以可以拆分为两个部分进行理解,第一部分为ClientConfig,第二部分为DefaultMQProducer。
# ClientConfig 部分
ClientConfig 定义了一些配置的获取方法,定义了命名空间等参数。无论是消息的发送者还是消费者都是通用的。
下面根据本次的版本的源代码介绍相关参数。
| 名称 | 描述 | 参数类型 | 默认值 | 有效值 | 重要性 |
| ------------------------------- | ---------------------------------------------- | ------------- | ------------------------------------------------------------ | ------ | ------ |
| namesrvAddr | NameServer的地址列表 | String | 从-D系统参数rocketmq.namesrv.addr或环境变量。NAMESRV_ADDR | | |
| instanceName | 客户端实例名称 | String | 从-D系统参数rocketmq.client.name获取,否则就是DEFAULT | | |
| clientIP | 客户端IP | String | RemotingUtil.getLocalAddress() | | |
| namespace | 客户端命名空间 | String | | | |
| accessChannel | 设置访问通道 | AccessChannel | LOCAL | | |
| clientCallbackExecutorThreads | 客户端通信层接收到网络请求的时候,处理器的核数 | int | Runtime.getRuntime().availableProcessors() | | |
| pollNameServerInterval | 轮询从NameServer获取路由信息的时间间隔 | int | 30000,单位毫秒 | | |
| heartbeatBrokerInterval | 定期发送注册心跳到broker的间隔 | int | 30000,单位毫秒 | | |
| persistConsumerOffsetInterval | 作用于Consumer,持久化消费进度的间隔 | int | 默认值5000,单位毫秒 | | |
| pullTimeDelayMillsWhenException | 拉取消息出现异常的延迟时间设置 | long | 1000,单位毫秒 | | |
| unitName | 单位名称 | String | | | |
| unitMode | 单位模式 | boolean | false | | |
| vipChannelEnabled | 是否启用vip netty通道以发送消息 | boolean | 从-D com.rocketmq.sendMessageWithVIPChannel参数的值,若无则是true | | |
| useTLS | 是否使用安全传输。 | boolean | 从-D系统参数tls.enable获取,否则就是false | | |
| mqClientApiTimeout | mq客户端api超时设置 | int | 3000,单位毫秒 | | |
| language | 客户端实现语言 | LanguageCode| LanguageCode.*JAVA* | | |
## namesrvAddr
NameServer 的地址列表。
## clientIp
```java
private String clientIP = RemotingUtil.getLocalAddress();
```
从代码中可以看到,使用`RemotingUtil#getLocalAddress` 获取IP信息,在当前版本中默认返回不是`127.0`或者`192.168`开头的 IPV4地址,否则尝试获取IPV6的地址,如果都找不到就用LocalHost地址。
## instanceName
```java
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
```
`instanceName`主要获取当前默认的系统参数客户端实例名称,它是客户端标识 CID 的组成部分
## unitName 单元名称
也是CID的组成部分之一,如果获取 NameServer 的地址是通过 URL 进行动态更新的话,会通过这个单元名称进行附加,用来区分不同的NameServer地址服务。
## clientCallbackExecutorThreads 回调线程池数量
表示public回调线程池的数量,默认为CPU的核数,通常这个值直接根据JVM获取的结果为基准即可。
```java
private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
```
## namespace 命名空间
4.5.1 之后才加入的新机制。主要适用场景为全链路压测的时候可以利用不同的命名空间划分出真实消息和压测消息,使得线上业务正常执行的情况下同步处理测试流程。
## pollNameServerInterval NameServer同步间隔
生产者客户端默认每隔出30S向NameServer 更新Topic的相关信息,注意这个参数在消费端同样存在相同的配置,这个配置通常不建议修改。
```java
/**
* Pulling topic information interval from the named server */
private int pollNameServerInterval = 1000 * 30;
```
## heartbeatBrokerInterval Broker心跳间隔
客户端向 Broker 发送心跳包的时间间隔,默认为 30s,不建议修改该值。
```java
/**
* Heartbeat interval in microseconds with message broker */
private int heartbeatBrokerInterval = 1000 * 30;
```
## persistConsumerOffsetInterval
客户端持久化消息消费进度的间隔,默认为 5s,该值不建议修改。
```java
/**
* Offset persistent interval for consumer */
private int persistConsumerOffsetInterval = 1000 * 5;
```
# DefaultMQProducer 部分
这部分定义了日志和常见的使用消息队列方法,注意在类的开头定义了一个 **transient** 变量执行内部的保护方法。
官方文档中极少DefaultMQProducer配置如下:
| 名称 | 描述 | 参数类型 | 默认值 | 有效值 | 重要性 |
| -------------------------------- | ------------------------------------------------------------ | --------------- | ------------------------------------------ | ------ | ------ |
| producerGroup | 生产组的名称,一类Producer的标识 | String | DEFAULT_PRODUCER | | |
| createTopicKey | 发送消息的时候,如果没有找到topic,若想自动创建该topic,需要一个key topic,这个值即是key topic的值 | String | TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC | | |
| defaultTopicQueueNums | 自动创建topic的话,默认queue数量是多少 | int | 4 | | |
| sendMsgTimeout | 默认的发送超时时间 | int | 3000,单位毫秒 | | |
| compressMsgBodyOverHowmuc | 消息body需要压缩的阈值 | int | 1024 * 4,4K | | |
| retryTimesWhenSendFailed | 同步发送失败的话,rocketmq内部重试多少次 | int | 2 | | |
| retryTimesWhenSendAsyncFailed | 异步发送失败的话,rocketmq内部重试多少次 | int | 2 | | |
| retryAnotherBrokerWhenNotStoreOK | 发送的结果如果不是SEND_OK状态,是否当作失败处理而尝试重发 | boolean | false | | |
| maxMessageSize | 客户端验证,允许发送的最大消息体大小 | int | 1024 *1024* 4,4M | | |
| traceDispatcher | 异步传输数据接口 | TraceDispatcher | null | | |
## DefaultMQProducerImpl 内部对象
`defaultMQProducerImpl` 比较意思,因为此对象是 `DefaultMQProducerImpl` 整个实现类的实际调用者,这里用了受保护的内部对象完成所有方法调用,用final是规避旧版本多个线程初始化对象非原子性的问题,同时保证持有的内部对象不可变。
```java
/**
* Wrapping internal implementations for virtually all methods presented in this class. */
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
```
> 为什么这里要用 transient?
> transient 关键字确保对象被序列化之后不会泄漏 DefaultMQProducerImpl 对象。
## InternalLogger 日志对象
接着是日志对象,日志对象 InternalLogger 如下定义,内部实现比较简单,基本是一些info和debug日志打印。
```java
InternalLogger log = ClientLogger.getLog()
```
客户端日志的实现类存储路径时是:`${user.home}/logs/rocketmqlogs/rocketmq_client.log`,这个路径的获取细节在`org.apache.rocketmq.client.log.ClientLogger#createClientAppender`可以看到有关细节。使用`System.getProperty("user.home")`获取的路径在Unix系统中相当于用户的主目录。
> user.home 如果是 xxx 则是 /usr/home/xxx 为开始,比如个人的Mac电脑最终的存放地址为:`/Users/zxd/logs/rocketmqlogs/rocketmq_client.log`。
## producerGroup 消息组
表示发送者所属组定义如下,根据注释可以得知,gropu 可以实现生产者实例的聚合,主要用在事务的的时候需要使用到,而如果是非事务的消息,每一个进程都是唯一的,彼此没有关联。
有关事务的内容涉及需要用到Broker反查机制,这里不做过多牵扯,继续介绍。
```
/**
* Producer group conceptually aggregates all producer instances of exactly same role, which is particularly * important when transactional messages are involved. </p>
*
* For non-transactional messages, it does not matter as long as it's unique per process. </p>
*
* See <a href="http://rocketmq.apache.org/docs/core-concept/">core concepts</a> for more discussion.
*/
private String producerGroup;
```
我们可以通过相关命令或者可视化工具查看发送者所属组的状态。注意默认的主题队列数量,RocketMq默认设置为4。
这里用了volatile保证多线程对于主题队列的数量时可见的,多个生产者实例观察的数量是一致的。
```java
/**
* Number of queues to create per default topic. */private volatile int defaultTopicQueueNums = 4;
```
## sendMsgTimeout 消息发送默认超时时间
消息默认发送的超时时间为3秒,
注意的是在 RocketMQ 4.3.0 版本之前由于存在重试机制,程序设置的设计为单次重试的超时时间,即如果设置重试次数为 3 次,则 `DefaultMQProducer#send` 方法可能会超过 9s 才返回。
```java
/**
* Timeout for sending messages. */
private int sendMsgTimeout = 3000;
```
主要的改动点在`org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl` 这个对象里面
![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/202209280724815.png)
修复的方式比较简单粗暴,是增加一个纳秒值进行计算 ,如果请求时间超过发送请求的时间太久就抛出异常。下一次请求对应的扣除掉本次耗费的时间再进行重试,如果重试超过的总时间超过超时时间也同样抛出异常。
这就意味着如果超时次数设置10次,可能不到10次就会因为超时时间的判断抛出异常信息。
```java
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
```
##compressMsgBodyOverHowmuch 压缩阈值
默认情况下,如果消息的长度超过4K,那么RocketMq默认会对于消息开启压缩,虽然会增加CPU的性能损耗,但是可以有效减少网络方便的开销。
```java
/**
* Compress message body threshold, namely, message body larger than 4k will be compressed on default. */
// 压缩消息体阈值,即默认压缩大于4k的消息体。
private int compressMsgBodyOverHowmuch = 1024 * 4;
```
```java
private boolean tryToCompressMessage(final Message msg) {
// 批量数据目前不支持压缩
if (msg instanceof MessageBatch) {
//batch does not support compressing right now
return false;
}
byte[] body = msg.getBody();
if (body != null) {
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
try {
// 压缩之后的数据
byte[] data = compressor.compress(body, compressLevel);
if (data != null) {
msg.setBody(data);
return true; }
} catch (IOException e) {
log.error("tryToCompressMessage exception", e);
log.warn(msg.toString());
}
}
}
return false;
}
```
## retryTimesWhenSendFailed 失败重试
同步消息发送重试次数。RocketMQ 客户端内部在消息发送失败时默认会重试 2 次。该参数与 `sendMsgTimeout` 联合生效,但是需要注意这个参数在SYNC模式下才会重试2次,如果是其他模式则默认是一次失败不再进行重试。
在SYNC模式只重试一次可以看下面代码:
```java
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
```
## retryTimesWhenSendAsyncFailed 异步消息重试
见名知义,**异步**消息发送重试次数,默认为 2,即重试 2 次,一共有 3 次机会。关键的代码在`org.apache.rocketmq.client.impl.MQClientAPIImpl#onExceptionImpl` 这个参数巨多的方法当中,简单判断当前的异步消息总的重试次数,如果重试多次超过次数则通过sendCallback回调发送异常。
```java
/**
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p>
*
* This may potentially cause message duplication which is up to application developers to resolve. */
private int retryTimesWhenSendFailed = 2;
```
## retryAnotherBrokerWhenNotStoreOK 失败向其他Broker重试
根据方法的本意按照道理来说如果客户端收到的结果不是 SEND_OK,应该直接向另外一个 Broker 重试,但根据代码分析目前这个参数并不能按预期运作,官方一致也没有关注过这个问题。
```java
/**
* Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
*
* This may potentially cause message duplication which is up to application developers to resolve. */
private int retryTimesWhenSendAsyncFailed = 2;
```
## maxMessageSize 最大消息体
允许发送的最大消息体,默认为 4M,具体可以看下面的判断,注意Broker也有 maxMessageSize 这个参数的设置,故客户端的设置不能超过服务端的配置:
客户端的发送限制如下:
```java
/**
* Maximum allowed message body size in bytes. */
private int maxMessageSize = 1024 * 1024 * 4; // 4M
...
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
```
maxMessageSize 另一个使用地点是在RocketMq的轨迹消息长度判断中,不过这一块的代码在2022年的上半年被某位大神大改优化过,里面的优化代码比较值得学习,但是因为这一块牵扯的内容比较大部头需要先放放,我们看其他参数内容。
```java
// 轨迹消息中累计到3/4左右的时候就进行合并提交
if (currentMsgSize >= traceProducer.getMaxMessageSize() - 10 * 1000) {
List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend);
traceExecutor.submit(asyncDataSendTask);
this.clear();
}
```
## sendLatencyFaultEnable 失败延迟规避
失败规避机制默认为false,它的含义是当Product向Broker发送消息失败之后,客户端的在内部重试的时候会规避掉上一次发送失败的Broker,并且一段时间内不会再向该Broker进行发送。
## notAvailableDuration 不可用延迟数组
不可用延迟数组,利用等比数列的时间发送消息,根据数组的设置在多少时间内不向Broker发送消息。从默认值可以看到这里是按照阶层的方式进行增长的。
```java
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
```
## latencyMax延迟最大值
设置消息发送的最大延迟级别,同样涉及了延迟推送机制。这里暂时略过。
```java
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
```
# MqAdmin
定义了一些基础的规范接口,由于和我们平时写业务代码的Service Interface类似,这里不在过多展开介绍,而是简单罗列一些比较常用的接口:
```java
/**
- String key:根据 key 查找 Broker,即新主题创建在哪些 Broker 上
- String newTopic:主题名称
- int queueNum:主题队列个数
- int topicSysFlag:主题的系统参数
*/
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
/**
根据队列与时间戳,从消息消费队列中查找消息,返回消息的物理偏移量(在 commitlog 文件中的偏移量)。
MessageQueue mq:消息消费队列
long timestamp:时间戳
*/
long searchOffset(MessageQueue mq, long timestamp)
/**
查询消息消费队列当前最大的逻辑偏移量,在 consumequeue 文件中的偏移量。
*/
long maxOffset(final MessageQueue mq)
/**
查询消息消费队列当前最小的逻辑偏移量。
*/
long minOffset(final MessageQueue mq)
/**
返回消息消费队列中第一条消息的存储时间戳。
*/
long earliestMsgStoreTime(MessageQueue mq)
/**
根据消息的物理偏移量查找消息
*/
MessageExt viewMessage(String offsetMsgId)
/**
根据主题与消息的全局唯一 ID 查找消息。
*/
MessageExt viewMessage(String topic, String msgId)
/**
批量查询消息,其参数列表如下:
String topic:主题名称
String key:消息索引 Key
int maxNum:本次查询最大返回消息条数
long begin:开始时间戳
long end:结束时间戳
*/
QueryResult queryMessage(String topic, String key, int maxNum, long begin,long end)
```
# 写在最后
简单的进行一些API讲解,我们可以下具体使用到之后再来本文查阅会更有实际意义。 # 引言
RocketMq3.X的版本和Kafka一样是基于Zookeeper进行路由管理的,但是这意味着运维需要多部署一套Zookeeper集群,后来RocketMq选择去ZK最终出现了NameServ。NameServ作为RocketMq源码阅读的切入点非常不错,本文将会介绍Ver 4.9.4 版本的NameServ源码分析。
NameServer主要有两个功能,**Broker管理**和**路由信息管理**。
![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221124055340.png)
整个NameServ实际代码只有几百行,因为本身出现根本目的就是替代ZK,所以角色类似ZK。在下面的截图中,**NamesrvStartup**为启动类,**NamesrvController**为核心控制器,**RouteInfoManager**为路由信息表,整个NameServ基本上就是围绕这三个类做文章。
NameServe的类结构图如下:
![](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221124060742.png)
# 源码分析
## NameServ 启动
NameServ的启动步骤主要有下面几个点:
1. 创建NameServ控制器,解析和创建重要配置,重要核心控制器创建并注入配置。
2. NameServ核心控制器初始化,NettyServ服务等次重要相关组件创建和初始化。
3. 启动定时任务,定期扫描过期Broker并且移除不活跃Broker,定期打印系统全部的KV配置。
4. 注册JVM钩子函数优雅关闭资源(Netty和线程池),启动Netty。
5. Netty服务启动
在了解代码细节之前,我们先画一个时序图了解NameServ的启动过程:
!(https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221129065901.png)
显然NameServ的整个启动基本上是在为Nettty做了一系列周边服务,Netty是网络通信的核心框架。
### 访问入口
整个NameServ的入口为`org.apache.rocketmq.namesrv.NamesrvStartup#main0`,我们直接定位到相关代码。
```java
public static NamesrvController main0(String[] args) {
try {
// 1. 构建核心控制器
NamesrvController controller = createNamesrvController(args);
// 2. 启动控制器
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
```
### 构建核心控制器
NameServer一开始的工作是构建核心控制器,从整体上看主要做了下面几个操作:
1. 调用Apach Commons CLI 命令行解析工具进行命令解析。
2. 根据**运行时参数**生成commandLine命令行对象。
3. 创建NamesrvConfig和NettyServerConfig对象,读取`-c`指定的配置文件路径解析配置文件。
4. **namesrvConfig** 和 **nettyServerConfig** 对象进行初始化。
Apach Commons CLI 工具可以帮助开发者快速构建服务器启动命令参数,并且支持输出到列表。这里我们接着进入到 `org.apache.rocketmq.namesrv.NamesrvStartup#createNamesrvController` 一探究竟。进入之后发现代码还不少,所以我们拆成**多个部分**分析。
下面是完整的代码:
```java
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
```
因为内容比较多,这里这里分段进行介绍,,首先是注册相关启动后命令:
```java
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
// 创建命令行参数对象,这里定义了 -h 和 -n参数
Options options = ServerUtil.buildCommandlineOptions(new Options());
// 根据Options和运行时参数args生成命令行对象,buildCommandlineOptions定义了-c参数(Name server config properties file)和-p参数(Print all config item)
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
```
`ServerUtil.buildCommandlineOptions(new Options())`以及`org.apache.rocketmq.namesrv.NamesrvStartup#buildCommandlineOptions`方法内部的逻辑:
```java
// org.apache.rocketmq.srvutil.ServerUtil#buildCommandlineOptions
public static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("h", "help", false, "Print help");
opt.setRequired(false);
options.addOption(opt);
opt =
new Option("n", "namesrvAddr", true,
"Name server address list, eg: '192.168.0.1:9876;192.168.0.2:9876'");
opt.setRequired(false);
options.addOption(opt);
return options;
}
// org.apache.rocketmq.namesrv.NamesrvStartup#buildCommandlineOptions
public static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("c", "configFile", true, "Name server config properties file");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("p", "printConfigItem", false, "Print all config items");
opt.setRequired(false);
options.addOption(opt);
return options;
}
```
从个人来看这个方法并不直观,并且复用性比较低,个人比较倾向于改成下面的方式:
```java
public static Option buildCommandlineOption(String opt, String longOpt, boolean hasArg, String description, boolean required){
Option option = new Option(opt, longOpt, hasArg, description);
option.setRequired(required);
return option;
}
```
最后在本地个人把代码改造为下面的方式,虽然参数还需要优化,但是感觉直观了不少:
```java
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
Options options = new Options();
// Modified to a more intuitive way of adding commands
options.addOption(ServerUtil.buildCommandlineOption("c", "configFile", true, "Name server config properties file", false));
options.addOption(ServerUtil.buildCommandlineOption("p", "printConfigItem", false, "Print all config items", false));
options.addOption(ServerUtil.buildCommandlineOption("h", "help", false, "Print help", false));
options.addOption(ServerUtil.buildCommandlineOption("n", "namesrvAddr", true,
"Name server address list, eg: '192.168.0.1:9876;192.168.0.2:9876'", false));
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, options, new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;}
```
如果觉得惹眼可以把这一段放到写好的方法里面,经过个人倒腾之后最终的代码如下:
```java
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
Options options = buildCommandlineOptions(options);
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, options, new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
//......
public static Options buildCommandlineOptions() {
Options options = new Options();
// Modified to a more intuitive way of adding commands
options.addOption(ServerUtil.buildCommandlineOption("c", "configFile", true, "Name server config properties file", false));
options.addOption(ServerUtil.buildCommandlineOption("p", "printConfigItem", false, "Print all config items", false));
options.addOption(ServerUtil.buildCommandlineOption("h", "help", false, "Print help", false));
options.addOption(ServerUtil.buildCommandlineOption("n", "namesrvAddr", true,
"Name server address list, eg: '192.168.0.1:9876;192.168.0.2:9876'", false));
return options;
}
```
相信读者对于Apach Commons CLI 命令行解析工具进行命令解析有了大致的了解。Apach的命令行解析工具帮助开发者根据运行时候的参数构建命令行对象,之后再通过 `-c` 的参数决定是否读取配置文件,解析配置文件之后填充到**namesrvConfig**和**nettyServerConfig**对象中。
解析命令之后是填充配置到对应的对象,填充配置文件的配置代码如下:
```java
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
```
这一段算是`createNamesrvController(String[] args)`核心代码之一,作用是先创建**NettyServerConfig**以及**NamesrvConfig**对象,然后利用commandLine命令行工具读取`-c`指定的配置文件路径,这里用比较经典的缓冲流文件IO读取,之后生成Properties对象,这些代码基本都是JAVAEE基础,就不一一扣细节了。
当生成Properties对象完成之后,将namesrvConfig和nettyServerConfig对象进行初始化。接下来有一些不重要的代码,比如发现没有参数配置RocketMqHome会给出提示:
```java
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
```
再比如会根据RocketMqHome的根路径下固定路径加载`logback_namesrv.xml`日志配置文件,如果把日志重定向到自己其他磁盘路径,需要注意**conf** 这个层级文件夹以及日志配置文件一并拷贝。
```java
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
```
之后便是重点操作创建NamesrvController核心控制器了,这里面把namesrvConfig和nettyServerConfig载入到核心控制器待后续初始化使用,代码如下:
```java
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
// 记住所有的配置以防止丢弃
controller.getConfiguration().registerConfig(properties);
```
上面的代码水到渠成地利用namesrvConfig和nettyServerConfig对象**创建NamesrvController对象**,然后在注册一遍properties防止丢失。
注意这里使用了JUC的 `java.util.concurrent.locks.ReadWriteLock`读写锁进行操作
>ReadWriteLock 是什么,可以参考廖老师的博客:[使用ReadWriteLock - 廖雪峰的官方网站 (liaoxuefeng.com)](https://www.liaoxuefeng.com/wiki/1252599548343744/1306581002092578)
> 使用`ReadWriteLock`可以提高读取效率:
> -`ReadWriteLock`只允许一个线程写入;
> -`ReadWriteLock`允许多个线程在没有写入时同时读取;
> -`ReadWriteLock`适合读多写少的场景。
看完之后我们发现`createNamesrvController(String[] args)` 是非常重要的方法,内部的关键操作如下:
- 提供**namesrvConfig**和**nettyServerConfig**配置对象
- 创建**NamesrvControlle**r核心控制器
创建完核心控制器之后紧接着便是启动控制器,这里有着次重要级别的初始化操作:
```java
// 2. 启动控制器
start(controller);
```
### 初始化
创建核心控制器之后,紧接着是核心控制器的相关初始化动作,初始化的重要任务是下面几个:
- 初始化核心控制器,内部逻辑属于次重要级相关组件启动。
- 注册JVM钩子函数优雅关闭Netty和释放资源
- 核心控制器真正启动运行,实际上为触发Netty服务开启。
`org.apache.rocketmq.namesrv.NamesrvStartup#start` 初始化代码如下:
```java
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 对核心控制器进行初始化操作
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 注册一个钩子函数,JVM进程关闭时优雅地释放netty服务、线程池等资源
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
controller.shutdown();
return null; }));
// 核心控制器启动操作
controller.start();
return controller;
}
```
`start()`的操作和创建核心控制器有点像,因为也是一个次重要级别的初始化操作。相关操作完成之后注册一个钩子函数优雅的释放Netty服务以及释放线程池的资源,最后对核心控制器进行启动操作。
我们继续深入核心控制器启动操作,`org.apache.rocketmq.namesrv.NamesrvController#initialize`代码如下:
```java
public boolean initialize() {
// 加载KV配置
this.kvConfigManager.load();
// 创建Netty网络服务对象
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
// 创建定时任务--每个10s扫描一次Broker,并定时剔除不活跃的Broker
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);
// 创建定时任务--每个10分钟打印一遍KV配置
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);
// 省略SSL判断代码
return true;
}
```
这部分代码主要目的是对核心控制器进行启动前的一些初始化操作,包括下面一些内容:
- 根据上面方法初始化的**NamesrvConfig**的**kvConfigPath**(存储KV配置属性的路径)加载KV配置
- 创建两个定时任务:
- 每隔10s扫描一次Broker,并定时剔除不活跃的Broker
- 每隔10分钟打印一遍KV配置
这里的定时任务每次间隔10s扫描一次Broker,并定时剔除不活跃的Broker。
> 路由删除的逻辑放到后面进行介绍,这里暂时跳过
之后我们继续看核心控制器是如何启动的,方法入口为`org.apache.rocketmq.namesrv.NamesrvController#start`:
```java
public void start() throws Exception {
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
```
非常简单的,代码其实就是启动一下Netty服务罢了,因为RocketMq 底层通信是依赖Netty的,不过Netty的细节不在本文的讨论范围,这里就不过多介绍挖掘细节了。
至此整个路由启动的代码完成。
## NameServ注册Broker
路由注册的时序图如下:
![路由注册](https://adong-picture.oss-cn-shenzhen.aliyuncs.com/adong/20221125062626.png)
路由注册简单来说就是Broker注册到NameServ的过程,主要是通过心跳包实现的,那么Broker在代码中是如何存储的呢?我们根据上面的时序图最后一步就可以直接找到答案,就是在 **RouteManager**里面,里面维护了下面的信息:
```java
private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
```
和Spring 管理Bean差不多的套路,用的是万能的Map,上面定义的变量中比较重要的如下(和文章开头对应的一致):
- **topicQueueTable**:Topic消息队列路由信息,包括topic所在的broker名称,读队列数量,写队列数量,同步标记等信息,rocketmq根据topicQueueTable的信息进行负载均衡消息发送。
- **brokerAddrTable**:Broker节点信息,包括brokername,所在集群名称,还有主备节点信息。
- **clusterAddrTable**:Broker集群信息,存储了集群中所有的Brokername。
- **brokerLiveTable**:Broker状态信息,Nameserver每次收到Broker的心跳包就会更新该信息。
RocketMq在消息队列消费模式上使用的是发布订阅的模式设计,这在[[【RocketMq】RocketMq 扫盲]]中也有提到,这里不多赘述。
Broker中会存在一个Topic中有很多个Queue的情况,在默认的参数配置中RocketMq为每个新创建的Topic默认分配4个读队列和4个写队列,多个Broker还会组成集群,Broker还会定期向NameServ发送心跳包注册信息,NameServ则通过brokerLiveTable完成Broker节点状态的管理。
下面我们根据时序图一步步往下观察**NameServ注册Broker**的过程:
### 发送心跳包
上面我们分析了NameServ的启动代码,其实观察Broker的启动代码会发现有一定的相似之处,都是第一步构建一个控制器,然后start(),创建控制器这一部分内容不是重点这里跳过,我们接着看start()方法。
```java
public static void main(String[] args) {
start(createBrokerController(args));
}
public static BrokerController start(BrokerController controller) {
try {
controller.start();
String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
if (null != controller.getBrokerConfig().getNamesrvAddr()) {
tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
}
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
```
`controller.start();`是时序图的开始,下面是`org.apache.rocketmq.broker.BrokerController#start:`的内部代码:
```java
public void start() throws Exception {
// 此处省略相关依赖组件的start()过程
//.....
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
// 主从同步节点配置处理
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
// 初次启动强制发送心跳包
this.registerBrokerAll(true, false, true);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}
if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
}
```
**registerBrokerAll** 这个方法的参数可读性不太好,所以这里列举一下三个参数的顺序以及代码对应的参数数值:
- boolean checkOrderConfig(true)
- boolean oneway(false)
- boolean forceRegister(true)
搭配上参数之后就比较好懂了,也就是说加上配置校验以及强制执行一次注册动作,并且以非oneWay的方式发送一次心跳包。
下面我们顺利进入到 **registerBrokerAll()** 方法,方法内部首先创建topic包装类 ,然后会有一段比较有意思的代码,那就是如果没有读写权限会默认重新创建一个临时使用的topicConfigTable设置到Topic当中,之后是判断Broker此时是否需要执行发送心跳包。
但是我们回到上一级调用`this.registerBrokerAll(true, false, true);`这里的参数传递就会发现,实际上**forceRegister总是为true**,也就是说基本上每个Broker第一次初始化必定需要传递心跳包的:
```java
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
// 创建 TopicConfigSerializeWrapper,topic包装类
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
// 如果没有读写权限,此时会默认重新创建一个临时使用的topicConfigTable,作为Topic包装类的参数数值
// 个人认为这一步是防止空参数导致后面的方法出现异常,同时如果后续具备读写权限之后不需要重新创建直接使用
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
// 这里初始化的值可以使用默认的Topic配置数量,比如加上topicConfigWrapper.getTopicConfigTable().values().size()
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>(topicConfigWrapper.getTopicConfigTable().values().size());
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
// 判断Broker是否需要发送心跳包
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
// 执行发送心跳包
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
```
下面我们接着定位到`needRegister`方法进行解读,这里我们主要定位到`org.apache.rocketmq.broker.out.BrokerOuterAPI#needRegister`方法,这里截取关键代码如下:
```java
brokerOuterExecutor.execute(() -> {
try {
QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
request.setBody(topicConfigWrapper.getDataVersion().encode());
// 同步远程调用到路由中心
RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
DataVersion nameServerDataVersion = null;
Boolean changed = false;
// 省略代码:根据返回代码进行判断处理
//..
log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);
} catch (Exception e) {
changedList.add(Boolean.TRUE);
log.error("Query data version from name server {}Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
});
```
这个代码不难理解,算是我们平常写HTTP调用的一个变体,我们可以通过**RequestCode.QUERY_DATA_VERSION**,查到NameServer的接受处理代码。
利用IDEA我们很快发现`org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest`方法入口,之后进入到`org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#queryBrokerTopicConfig`方法,然后这里看到对应代码如下:
```java
public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();
final QueryDataVersionRequestHeader requestHeader =
(QueryDataVersionRequestHeader) request.decodeCommandCustomHeader(QueryDataVersionRequestHeader.class);
DataVersion dataVersion = DataVersion.decode(request.getBody(), DataVersion.class);
// 内部处理:如果dataVersion为空或者当前dataVersion不等于brokerLiveTable存储的brokerLiveTable,Broker就需要发送心跳包
Boolean changed = this.namesrvController.getRouteInfoManager().isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(), dataVersion);
if (!changed) {
// 更新Broker信息
this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr(), System.currentTimeMillis());
}
DataVersion nameSeverDataVersion = this.namesrvController.getRouteInfoManager().queryBrokerTopicConfig(requestHeader.getBrokerAddr());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
if (nameSeverDataVersion != null) {
response.setBody(nameSeverDataVersion.encode());
}
responseHeader.setChanged(changed);
return response;
}
```
我们进入到关键判断代码`org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#isBrokerTopicConfigChanged`
:
```java
public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
DataVersion prev = queryBrokerTopicConfig(brokerAddr);
// 如果dataVersion为空或者当前dataVersion不等于brokerLiveTable存储的brokerLiveTable,Broker就需要发送心跳包
return null == prev || !prev.equals(dataVersion);
}
public DataVersion queryBrokerTopicConfig(final String brokerAddr) {
BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
if (prev != null) {
return prev.getDataVersion();
}
return null;
}
```
Broker是否需要发送心跳包由该Broker在路由中心`org.apache.rocketmq.namesrv.routeinfo.BrokerLiveInfo#dataVersion`决定,如果dataVersion为空或者当前dataVersion不等于brokerLiveTable存储的**brokerLiveTable**,Broker就需要发送心跳包。
### Nameserver处理心跳包
Nameserver的netty服务监听收到心跳包之后,会调用到路由中心以下方法进行处理,具体的方法入口为:`org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker`
```java
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
// 获取集群下所有的Broker,并将当前Broker加入clusterAddrTable,由于brokerNames是Set结构,并不会重复
Set<String> brokerNames = this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());
brokerNames.add(brokerName);
boolean registerFirst = false;
// 获取Broker信息,如果是首次注册,那么新建一个BrokerData并加入brokerAddrTable
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
//从库切换主库:首先删除namesrv中的<1, IP:PORT>,然后添加<0, IP:PORT>。
//同一个IP:端口在brokerAddrTable中只能有一条记录。
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
log.debug("remove entry {} from brokerData", item);
it.remove();
}
}
// 里判断Broker是否是已经注册过
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
// 如果是Broker是Master节点吗,并且Topic信息更新或者是首次注册,那么创建更新topic队列信息
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 更新BrokerLiveInfo状态信息
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
// 如果不是MASTER_ID,则返回结果返回masterAddr。
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
```
上面的代码是**Broker心跳包的最核心方法**,它主要做了下面几件事:
- RouteInfoManager路由信息的更新操作
- clusterAddrTable 更新;
- brokerAddrTable 更新;
- topicQueueTable 更新;
- brokerLiveTable 更新;
## 定期排除Broker
根据理论学习我们知道,NameServ在启动的时候会创建一个定时任务,定时剔除不活跃的Broker。这一点的源码在`org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker`中可以找到答案。
此外在单元测试中就有关于这一项定期清理的测试,也是比较快的找到入口的办法:`org.apache.rocketmq.namesrv.routeinfo.RouteInfoManagerBrokerRegisterTest#testScanNotActiveBroker`:
这个测试非常简单直观我们:
```java
private static RouteInfoManager routeInfoManager;
public static String clusterName = "cluster";
public static String brokerPrefix = "broker";
public static String topicPrefix = "topic";
public static int brokerPerName = 3;
public static int brokerNameNumber = 3;
@test
public void testScanNotActiveBroker() {
for (int j = 0; j < brokerNameNumber; j++) {
String brokerName = getBrokerName(brokerPrefix, j);
for (int i = 0; i < brokerPerName; i++) {
String brokerAddr = getBrokerAddr(clusterName, brokerName, i);
// set not active
routeInfoManager.updateBrokerInfoUpdateTimestamp(brokerAddr, 0);
assertEquals(1, routeInfoManager.scanNotActiveBroker());
}
}
}
```
在启动单元测试之前会先构建10个Broker节点注册进去,这里单元测试细心的使用了多个集群模拟生产环境:
```java
private static RouteInfoManager routeInfoManager;
public static String clusterName = "cluster";
public static String brokerPrefix = "broker";
public static String topicPrefix = "topic";
public static int brokerPerName = 3;
public static int brokerNameNumber = 3;
@Before
public void setup() {
routeInfoManager = new RouteInfoManager();
cluster = registerCluster(routeInfoManager,
clusterName,
brokerPrefix,
brokerNameNumber,
brokerPerName,
topicPrefix,
10);
}
```
之后我们直接跑一边单元测试,在日志中单元测试为我们展示了详细的测试流程:
1. 首先是构建broker注册,内部会塞入一些测试数据的Topic进行填充。
```java
06:54:23.353 INFO RocketmqNamesrv - new broker registered, cluster-broker-0:0 HAServer: cluster-broker-0:0
06:54:23.353 INFO RocketmqNamesrv - new broker registered, cluster-broker-0:1 HAServer: cluster-broker-0:1
06:54:23.353 INFO RocketmqNamesrv - new broker registered, cluster-broker-0:2 HAServer: cluster-broker-0:2
06:54:23.353 INFO RocketmqNamesrv - cluster brokerName master address change from null to cluster-broker-1:0
06:54:23.353 INFO RocketmqNamesrv - new broker registered, cluster-broker-1:0 HAServer: cluster-broker-1:0
06:54:23.355 INFO RocketmqNamesrv - new broker registered, cluster-broker-1:1 HAServer: cluster-broker-1:1
06:54:23.355 INFO RocketmqNamesrv - new broker registered, cluster-broker-1:2 HAServer: cluster-broker-1:2
06:54:23.355 INFO RocketmqNamesrv - cluster brokerName master address change from null to cluster-broker-2:0
06:54:23.355 INFO RocketmqNamesrv - new broker registered, cluster-broker-2:0 HAServer: cluster-broker-2:0
06:54:23.355 INFO RocketmqNamesrv - new broker registered, cluster-broker-2:1 HAServer: cluster-broker-2:1
06:54:23.355 INFO RocketmqNamesrv - new broker registered, cluster-broker-2:2 HAServer: cluster-broker-2:2
```
2. 接着便是根据单元测试的代码进行遍历排除Broker节点,在循环的最后调用扫描检查不活跃Broker。这里为了验证直接设置lastUpdateTimestamp(最后更新时间)让Broker存活验证周期提前结束验证扫描效果。
```java
06:55:34.483 INFO RocketmqRemoting - closeChannel: close the connection to remote address result: true
06:55:34.483 WARN RocketmqNamesrv - The broker channel expired, cluster-broker-1:0 120000ms
06:55:34.483 INFO RocketmqNamesrv - remove brokerAddr from brokerAddrTable, because channel destroyed
06:55:34.483 INFO RocketmqRemoting - closeChannel: close the connection to remote address result: true
06:55:34.483 WARN RocketmqNamesrv - The broker channel expired, cluster-broker-1:1 120000ms
06:55:34.483 INFO RocketmqNamesrv - remove brokerAddr from brokerAddrTable, because channel destroyed
06:55:34.483 INFO RocketmqRemoting - closeChannel: close the connection to remote address result: true
06:55:34.483 WARN RocketmqNamesrv - The broker channel expired, cluster-broker-1:2 120000ms
06:55:34.484 INFO RocketmqNamesrv - remove brokerAddr from brokerAddrTable, because channel destroyed
06:55:34.484 INFO RocketmqNamesrv - remove brokerName from brokerAddrTable, because channel destroyed
06:55:34.484 INFO RocketmqNamesrv - remove brokerName, clusterName from clusterAddrTable, because channel destroyed
```
以上便是单元测试的大致内容,我们接着看看具体的代码即可,这里还是用了迭代器模式进行遍历删除,又是一个经典的设计模式:
```java
public int scanNotActiveBroker() {
int removeCount = 0;
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
// BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2 = 120秒,在单元测试中这里的last被设置为0所以必然超时
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
removeCount++;
}
}
return removeCount;
}
```
剔除Broker信息的逻辑比较简单,首先从**BrokerLiveInfo**获取状态信息,判断Broker的心跳时间是否已超过限定值(默认120秒),若超过之后就执行剔除操作。
# 写在最后
分析完了rocketmq自带的路由中心源码,其实我们自己实现一个路由中心貌似也不难。NameServ小而美的设计非常取巧,当然仅仅几百行代码确实还是存在比较多的不完美之处,很多方案需要开发人员自己编写业务代码兜底,但是有因为设计简单负责的任务,使用并且业务代码扩展性很强,维护成本低并且性能不错。
NameServ作为整个RocketMq的核心用法上简单的同时非常适合作为Rocketmq的切入点,个人在阅读代码中也会尝试修改代码查看效果,自己参与到源码编写和改造过程这会对代码编写者的思路更为清晰理解,也算是一个源码阅读的小技巧吧。
# 参考资料
- (https://objcoding.com/2019/03/11/rocketmq-nameserver/#%E8%B7%AF%E7%94%B1%E5%88%A0%E9%99%A4) 有空了试一下,之前都是买阿里云现成的服务 想讨论一下你们公司使用RocketMq的业务场景,对业务感兴趣一些,还想问RocketMq比Kafka好用吗,优点在哪,你为什么选择RocketMq呢 1124480274 发表于 2022-11-10 17:23
想讨论一下你们公司使用RocketMq的业务场景,对业务感兴趣一些,还想问RocketMq比Kafka好用吗,优点在哪, ...
主要是我公司项目需要保证消息投递可靠性,并且考虑到纯JAVA编写遇到看源码排查问题的场景也能应付,而且RocketMq使用的公司还是挺多的,架构和各种问题的解决方案都比较成熟。 zxdsb666. 发表于 2022-11-10 20:40
主要是我公司项目需要保证消息投递可靠性,并且考虑到纯JAVA编写遇到看源码排查问题的场景也能应付,而且 ...
原来如此,感谢分享
页:
[1]