使用 docker 运行 RocketMQ + Canal + ElasticSearch + Golang 示例
本帖最后由 thepoy 于 2021-5-8 20:21 编辑### 0 引言
在很多业务情况下,我们都会在系统中引入`ElasticSearch`搜索引擎作为做全文检索的优化方案。
如果数据库数据发生更新,这时候就需要在业务代码中写一段同步更新`ElasticSearch`的代码。
下面我会以一个`blog文章管理`为例来演示`canal`+`RocketMQ`用`Golang`实现`MySQL`与`ElasticSearch`的数据同步。
示例地址:https://gitee.com/thepoy/RocketMQ_Canal_ElasticSearch_Golang
尽量不要在 macOS 中使用,创建的容器多多少少会有问题,出问题时很难找到症结所在,而在 linux 系统中使用则一切正常。
### 1 RocketMQ
`RocketMQ`是没有官方镜像的,所以需要在本地创建:
```shell
cd rocketMQ
docker build --no-cache -f Dockerfile -t rocketmq:4.8.0 --build-arg version=4.8.0 .
```
> 可根据自己的需求对 Dockerfile 进行修改
修改环境变量文件`.env`中的主机地址为自己的 ip 地址,然后使用 rocketMQ 目录中的配置文件创建容器:
```shell
docker-compose --file compose.yml up
```
### 2 Canal
#### 2.1 创建容器
使用项目根目录中的配置文件创建`mysql`、`canal-admin`和`canal-server`容器:
```shell
cd ..
docker-compose --file compose.yml up
```
也有一个环境变量文件需要修改,另外,compos 文件中的信息也需要根据需要修改,如 mysql 的 root 密码。
#### 2.2 为 canal 账号授权
创建 mysql 容器时也创建了 canal 账号,需要为这个账号授权。
```sql
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
```
#### 2.3 打开 canal 管理后台
打开[管理后台](http://localhost:8089),打开后需要用`admin`账号登录,默认密码为`123456`,管理后台的界面如下图所示:
!(https://z3.ax1x.com/2021/05/07/g3Ftx0.png)
因为 compose.yml 文件中已经配置了 canal-server,所以在后台中能看见已经启动的一个 server。
#### 2.4 配置实例 / Instance
点击侧边栏的`Instance管理`,选择`新建 Instance`,选择那个唯一的主机,再点击`载入模板`,修改下面的一些参数:
```ini
# 取消第 3 行中 mysql slaveId 的注释,随便修改为一个数字(不能是 1,因为 mysql 的 server_id=1)
canal.instance.mysql.slaveId=1234
# 修改 mysql 的地址,canal-admin 容器中也有一个 mysql 实例,我们不使用这个 mysql,而使用单独的 mysql 容器
canal.instance.master.address=192.168.31.129:3306
# 改成自己的数据库信息(需要监听的数据库,新建一个 database 就可以),这一行需要添加
canal.instance.defaultDatabaseName = blog
# table regex 需要过滤的表 这里数据库的中所有表
canal.instance.filter.regex = .\*\\..\*
# MQ 配置 日志数据会发送到 blog_articles 这个 topic 上
canal.mq.topic=blog_articles
```
实例名称随便填一个就行。
创建好的新实例默认是停止状态,将其启动。
!(https://z3.ax1x.com/2021/05/07/g3ETJJ.png)
创建 database 和 table:
```sql
CREATE DATABASE IF NOT EXISTS `blog`;
USE blog;
CREATE TABLE IF NOT EXISTS `blog_articles` (
`id` INT AUTO_INCREMENT PRIMARY KEY NOT NULL,
`title` VARCHAR(100) NOT NULL UNIQUE,
`content` TEXT NOT NULL,
`created_date` VARCHAR(10) NOT NULL
);
```
#### 2.5 配置 canal-server
!(https://z3.ax1x.com/2021/05/07/g3VSFe.png)
修改下面的参数:
```ini
# 默信是 tcp, 修改为 rocketMQ
canal.serverMode = rocketMQ
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = blog
rocketmq.namesrv.addr = 192.168.31.129:9876
```
保存后 server 会重启,这时打开 (http://localhost:8080/#/topic),能够看到新增加了一个主题`blog_articles`:
!(https://z3.ax1x.com/2021/05/07/g3eZGQ.png)
可以通过添加一行数据来测试是否成功:
```sql
INSERT INTO blog.blog_articles
(title, content, created_date)
VALUES('test1', '这是第 1 个测试文章', '2020-01-01');
```
添加后,在 rocketMQ 控制台查看消息:
!(https://z3.ax1x.com/2021/05/07/g3m9W4.png)
!(https://z3.ax1x.com/2021/05/07/g3mZTK.png)
可以看到,添加数据的消息已经产生等待消费。
### 3 Elasticsearch
elasticsearch 容器会在使用配置文件创建 Canal 时一同创建,需要注意的是,如果你想修改 elasticsearch 的 tag,可以在`.env`文件中修改`ES_TAG`的值。
我没有创建 Kibana 容器,有需要的话可以自行创建。
### 4 代码设计
当数据库发生变化时,Canal 会将变化信息发送到 RocketMQ 中,所以我们只需要消费 RocketMQ 中的消息就可以做到即时或很快地将变化的数据同步到 Elasticsearch 中。
#### 4.1 RocketMQ
##### 常量
```go
const (
// topic 在 Canal 中已经配置了,这里一定不能写错
topic string = "blog_articles"
// 消费者组可以自定义,但要与 2.5 节中设置的 rocketmq.producer.group 相同
consumerGroup string = "blog"
)
```
从环境变量中获取`host`,并生成`server`:
```go
var (
server string
Host string
)
func init() {
Host = os.Getenv("HOST")
if Host == "" {
Host = "localhost"
}
server = Host + ":9876"
}
```
##### 结构体的设计
虽然代码中没有用到这个结构体,但我觉得需要拿出来聊一聊:
```go
type ChangedData struct {
// 变化的文档集合
Data []es.Document `json:"data"`
// 发生变化的数据库
Database string `json:"database"`
// 数据库内执行时间
ES uint64 `json:"es"`
// 就是 id
ID uint `json:"id"`
// 是否为 DDL 语句,create database、create table、alter table
IsDDL bool `json:"isDdl"`
// 表结构的字段类型
MysqlType mapstring `json:"mysqlType"`
// 主键名称
PrimaryKeyNames []string `json:"pkNames"`
// sql 语句
SQL string `json:"sql"`
// sql 语句类型
SqlType mapuint `json:"sqlType"`
// 表名称
Table string `json:"table"`
// 操作类型,(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
Type string `json:"type"`
// 数据库内解析时间
Timestamp uint `json:"ts"`
// 旧数据
Old []mapstring `json:"old"`
}
```
其中`es.Document`结构如下:
```go
type Document struct {
ID string `json:"id,omitempty"`
Title string `json:"title,omitempty"`
Content string `json:"content,omitempty"`
CreatedDate string `json:"created_date,omitempty"`
}
```
##### 使用第三方 json 库
这也是为什么没用到上面的结构体的原因。
使用 json 标准库处理消息数据并同步到 es 中,完全是小题大做,会浪费很多的性能。
```go
data := gjson.Get(string(msg.body), "data")
```
使用 gjson 库,可以方便地从 json 字符串中获取想要的数据,并进行后续处理,无需将整个 json 反序列化。
##### 使用 context 阻塞或退出消费线程
启动消费订阅后,阻塞多久,就会消费多久,为了能够控制何时结束消费,这里使用`context`的`cancle()`函数控制:
```go
err = c.Start()
...
select {
case <-ctx.Done():
fmt.Println(strings.Repeat("*", 60))
fmt.Println("shutdown consumer")
fmt.Println(strings.Repeat("*", 60))
}
err = c.Shutdown()
...
```
#### 4.2 Elasticsearch
es 的代码是通用的,没有特别说明的意义,直接看代码即可。
#### 4.3 二者结合
结合 RocketMQ 和 Elasticsearch 的代码,就能完成**消息的即时消费**和**文档的即时更新**。
##### 需要从消息中取出的数据
上面的结构体对每个字段都有注释,此示例只取`data`、`old`和`type`三个字段:
```go
// 将消息体解析成 gjson.Result
body := gjson.Parse(string(msg.Body))
// 从消息体中取 data
data := body.Get("data").Array()
// 从消息体中取 old
old := body.Get("old").Array()
// 从消息体中取 type
canalTypeStr := body.Get("type").String()
```
##### 根据不同的操作以不同的方式更新数据
本示例中的仅包括非 DDL 操作,仅限于基本的增、删、改,因为数据已同步到 es 中,所以 查 应该在 es 中进行。
```go
switch canalType {
case canal.DELETE:
...
case canal.UPDATE:
...
case canal.INSERT:
...
default:
log.Fatal("未知操作", canalType)
}
```
### 5 操作结果
设置环境变量(可选操作):
```shell
export HOST=192.168.31.129
```
运行示例:
```go
go run main.go
```
然后在数据库中添加一篇文章:
```sql
INSERT INTO blog.blog_articles
(title, content, created_date)
VALUES('test9', '这是第 9 篇测试文章', '2020-01-01');
```
在终端中就能看见日志:
```
...
2021/05/08 15:05:41 已创建新的文档: map
...
```
在 es 中查询一下`id=10`的文档:
```shell
curl -X GET "http://localhost:9200/canal_es/_doc/10?pretty"
```
查询结果:
```json
{
"_index" : "canal_es",
"_type" : "_doc",
"_id" : "10",
"_version" : 1,
"_seq_no" : 14,
"_primary_term" : 1,
"found" : true,
"_source" : {
"content" : "这是第 9 篇测试文章",
"created_date" : "2020-01-01",
"id" : "10",
"title" : "test9"
}
}
```
在数据库中更新一下这篇文章的创建日期:
```sql
UPDATE blog.blog_articles
SET created_date='2009-04-15'
WHERE id=10;
```
终端日志:
```
2021/05/08 15:15:08 文档已存在,即将更新...
{"_index":"canal_es","_type":"_doc","_id":"10","_version":2,"result":"updated","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":15,"_primary_term":1}
2021/05/08 15:15:08 已更新文档:id=10, new-data=map
```
再查询一下这篇文章信息,结果为:
```json
{
"_index" : "canal_es",
"_type" : "_doc",
"_id" : "10",
"_version" : 2,
"_seq_no" : 15,
"_primary_term" : 1,
"found" : true,
"_source" : {
"content" : "这是第 9 篇测试文章",
"created_date" : "2009-04-15",
"id" : "10",
"title" : "test9"
}
}
```
可见,创建日期已经更新。
下面删除这篇文章:
```sql
DELETE FROM blog.blog_articles
WHERE id=10;
```
终端日志:
```
2021/05/08 15:18:03 即将删除文档10
2021/05/08 15:18:03 已删除: {"id":"10","title":"test9","content":"这是第 9 篇测试文章","created_date":"2009-04-15"}
```
再查询一下这篇文档:
```json
{
"_index" : "canal_es",
"_type" : "_doc",
"_id" : "10",
"found" : false
}
```
es 中也已删除此文章。
----
示例结束。
页:
[1]