King's Studio

使用阿里开源神器canal将MySQL数据实时同步到Elasticsearch

字数统计: 1.2k阅读时长: 4 min
2021/04/02 Share

最近工作中需要使用Elasticsearch作为搜索引擎实现海量数据的搜索功能,以前也对Elasticsearch做过介绍了,包括使用Docker快速部署Elasticsearch进行学习,这是第一次在工作中使用到,需要会的东西也相应的更全面一些,包括复杂的条件查询,分页查询等,这部分Elasticsearch的操作会放到后面的博客中进行专门介绍,包括使用kibana开发者工具快速操作Elasticsearch。在学会Elasticsearch的API之后,不禁就会思考Elasticsearch的数据源从哪里来呢,以前学习时并没有思考过这个问题,后来在架构师的提示下,思考了将MySQL数据同步到Elasticsearch的方式,之后所有的查询均面对Elasticsearch进行操作,那么今天的主题来了,就是使用github上拥有star数达到18.3k的canal进行数据同步。

在github上搜索canal就能看到它的详细介绍,canal支持多目标,包括MySQL、kafka、Elasticsearch、Hbase等,可以说是非常强大,在这里我就不再对canal进行详细的介绍了,github中它的主页全都有,我就放一张具有canal精神的图。

canal

MySQL开启binlog写入功能

其实canal的启动主要包括三部分,canal-deployer即server端,canal-adapter端,canal-client端,由于在工作中时间比较紧,client客户端没有进行部署,当然canal的核心也就是server和adapter,在部署服务之前我们首先需要确定自己的环境的三个中间件的版本。

中间件 版本
MySQL 5.7
Elasticsearch 6.4.2
canal-deployer 1.1.4
canal-adapter 1.1.4

如果你的中间件版本和我不同,比如Elasticsearch可能用到7及以上,那么你可能需要下载最新版本的canal才能进行支持,版本这个东西很有魔性,可以通过尝试试出正确的对应版本,如果都是高版本的建议直接上最新的canal。

由于canal是通过订阅MySQL的binlog来实现数据同步的,所以我们需要开启MySQL的binlog写入功能,并设置binlog-format为ROW模式,我们需要去修改mysqld的配置文件,配置如下:

mysql

配置完成后需要重启MySQL,执行sql语句:

1
show variables like '%log_bin%';

查看binlog是否启用,结果如下:

mysql

再执行sql查看binlog的模式:

1
show variables like'binlog_format%';

结果如下图:

mysql

接下来需要创建一个拥有从库权限的账号,用于订阅binlog:

1
2
3
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

canal-deployer的启动

将我们下载好的压缩包canal.deployer-1.1.4.tar.gz上传到Linux服务器,并进行解压:

1
tar -zxvf canal.deployer-1.1.4.tar.gz

修改配置文件conf/example/instance.properties,按如下配置即可,主要是修改数据库相关配置,这边server主要是与数据源进行连通:

mysql

然后进入deployer的bin目录,执行启动脚本:

1
./startup.sh

启动完成后,可以使用tail命令查看启动日志,是否有报错信息:

1
2
cd /usr/local/canal/logs/example
tail -200f example.log

日志如下,没有报错信息即为成功:

mysql

canal-adapter的启动

将我们下载好的压缩包canal.adapter-1.1.4.tar.gz上传到Linux服务器,进行解压:

1
tar -zxvf canal.adapter-1.1.4.tar.gz

下面我们需要修改adapter的配置文件,这边主要是将数据源和目标进行连通,并且我们在这里需要指定Elasticsearch的索引和类型,以及确定将哪一张源表同步到Elasticsearch中:

1
vi application.yml

mysql

mysql

然后我们进行源表和索引对应关系的配置:

1
2
3
cd /canal-adapter/conf/es
#此处的yml文件名为对应的索引名称,这个可以自己定义
vi daac0118_da_dir_content.yml

mysql

这些配置都完成后,我们就可以先启动adapter:

1
2
cd /usr/local/canal-adapter/bin
./startup.sh

启动完成后我们可以查看启动日志的情况:

mysql

看到如上的日志即表示启动成功。

创建对应的Elasticsearch索引

在这里我推荐使用kibana的开发者工具,操作Elasticsearch非常方便,在这里我将我的创建索引的代码放出来,以供参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
PUT daac0118_da_dir_content
{
"mappings":{
"da_dir_content":{
"properties":{
"DADIRCODE": {
"type": "text"
},
"DADIRPARENTCODE": {
"type": "text"
},
"DADIRSUBCODE": {
"type": "text"
},
"DADIRTYPE": {
"type": "text"
},
"DADIRTPLP": {
"type": "text"
},
"DADIRISSTOP": {
"type": "text"
},
"DADIRNAME": {
"type": "text"
},
"DADIRORDER": {
"type": "text"
},
"DAVERSION": {
"type": "text"
},
"DAMEMO1": {
"type": "text"
},
"DAMEMO2": {
"type": "text"
},
"DAMEMO3": {
"type": "text"
},
"CREATER": {
"type": "text"
},
"CREATETIME": {
"type": "date"
},
"UPDATER": {
"type": "text"
},
"UPDATETIME": {
"type": "date"
}
}
}
}
}

创建完成后,我们向源表中插入数据,同时查看adapter的日志情况:

mysql

可以看到adapter已经识别到了我们的操作,我们再去kibana中查看对应的索引文档情况:

mysql

可以看到已经能够查询到正确的数据,删除操作也按照此步骤,同样能够看到正确的结果。

CATALOG
  1. 1. MySQL开启binlog写入功能
  2. 2. canal-deployer的启动
  3. 3. canal-adapter的启动
  4. 4. 创建对应的Elasticsearch索引