cannl同步mysql数据到es中
canal组件介绍
canal-admin
(非必须但推荐使用):为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。
canal-server:
服务端,从mysql读取binlog日志获取增量日志,可以通过tcp、kafka、RocketMQ等方式与客户端通信;通过zookeeper搭建集群。
canal-adapter
客户端,根据canal-server获取的增量日志执行适配到其他诸如elasticsearch、redis、mysql等端,实现数据同步。
本次实验环境
首先我们需要下载canal的各个组件canal-server
、canal-adapter
、canal-admin
,下载地址:https://github.com/alibaba/canal/releases
es及相关工具的部署可参考Elasticsearch详解及部署
1、开启mysql的binlog
使用canal-server需要先准备mysql,对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式
vi /etc/my.cnf
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
配置完成后重启mysql,并查询是否配置生效:ON就是开启
systemctl restart mysqld
可以进入数据库再次查看下
mysql> show variables like 'log_bin'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+ 1 row in set (0.01 sec) mysql> show variables like 'binlog_format%'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ 1 row in set (0.00 sec)
2、mysql用户数据准备
创建一个canal用户并对其进行授权
CREATE USER canal IDENTIFIED BY 'Cjz123456.'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
创建一个测试用户数据库及表
create database canal;
USE canal; CREATE TABLE product ( id bigint(20) NOT NULL AUTO_INCREMENT, title varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, sub_title varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, price decimal(10, 2) NULL DEFAULT NULL, pic varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, PRIMARY KEY (id) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
3、创建存放canal的目录
mkdir /usr/local/canal-server
mkdir /usr/local/canal-adpter
4、部署canal-server
tar -zxvf canal.deployer-1.1.6-SNAPSHOT.tar.gz -C /usr/local/canal-server
vim /usr/local/canal-server/conf/example/instance.properties
# 需要同步数据的MySQL地址 canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # 用于同步数据的数据库账号 canal.instance.dbUsername=canal # 用于同步数据的数据库密码 canal.instance.dbPassword=Cjz123456. # 数据库连接编码 canal.instance.connectionCharset = UTF-8 # 需要订阅binlog的表过滤正则表达式 canal.instance.filter.regex=.*//..*
cd /usr/local/canal-server/bin
./startup.sh
查看日志查看是否正常启动
cd /usr/local/canal-server/logs/
tail -f example/example.log
tail -f canal/canal.log
5、部署canal-adapter
tar -zxvf canal.adapter-1.1.6-SNAPSHOT.tar.gz -C /usr/local/canal-adpter/
cd /usr/local/canal-adpter/conf
vim application.yml
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp # 客户端的模式,可选tcp kafka rocketMQ flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效 zookeeperHosts: # 对应集群模式下的zk地址 syncBatchSize: 1000 # 每次同步的批数量 retries: 0 # 重试次数, -1为无限重试 timeout: # 同步超时时间, 单位毫秒 accessKey: secretKey: consumerProperties: canal.tcp.server.host: 127.0.0.1:11111 #设置canal-server的地址 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: srcDataSources: defaultDS: url: jdbc:mysql://192.168.111.129:3306/canal?useUnicode=true&charaterEncoding=utf-8&useSSL=false username: canal password: Cjz123456. canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger - name: es7 hosts: http://192.168.111.129:9200 # 127.0.0.1:9200 for rest mode properties: mode: rest # or rest security.auth: elastic:elastic # only used for rest mode cluster.name: my-es
修改 canal-adapter/conf/es7/mytest_user.yml
文件,用于配置MySQL
中的表与Elasticsearch
中索引的映射关系
vim es7/mytest_user.yml
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值 destination: example # canal的instance或者MQ的topic groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据 esMapping: _index: canal_product # es 的索引名称 _id: _id # 将Mysql表里的id对应上es上的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配 sql: "SELECT p.id AS _id, p.title, p.sub_title, p.price, p.pic FROM product p" # sql映射 etlCondition: "where p.id>={}" #etl的条件参数 commitBatch: 3000 # 提交批大小
cd /usr/local/canal-adpter/bin/
./startup.sh
查看日志看是否正常启动
cd /usr/local/canal-adpter/logs/adapter/
tail -f adapter.log
6、创建ES索引
可以直接通过Kibana页面进行创建,不难看出,索引的属性对应着就是我们表的属性(Mysql表id与es的_id做了对应,所以这里不需要设置)
PUT canal_product { "mappings": { "properties": { "title": { "type": "text" }, "sub_title": { "type": "text" }, "pic": { "type": "text" }, "price": { "type": "double" } } } }
7、验证
canal初始化是不会全量同步数据的,所以我们需要登录mysql,插入一条数据
INSERT INTO product ( id, title, sub_title, price, pic ) VALUES ( 15, '小米8', ' 全面屏游戏智能手机 6GB+64GB', 1999.00, NULL );
创建完成后在Kibana上查看即可发现数据已经同步
GET canal_product/_search
在elasticsearch-head上查看也是,可以看出,我们的表id变成了es的_id
在Mysql上修改数据看看
UPDATE product SET title='小米10' WHERE id=15;
可以看到我们修改的数据也进行了同步
该文章参考于:https://blog.csdn.net/zh1998wx/article/details/123101442
原创文章,作者:745907710,如若转载,请注明出处:https://blog.ytso.com/244841.html