Canal-保存mysql篇
一、java实现
先用java代码手写一遍,方便后续业务逻辑理解
1、maven配置:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.24</version>
</dependency>
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
2、application.properties配置
# 服务端口
server.port=10000
# 服务名
spring.application.name=canal-client
# 环境设置:dev、test、prod
spring.profiles.active=dev
# mysql数据库连接
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/cancal_mysql?serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=603409875
3、代码实现
-
首先声明一个队列来接收sql语句
//sql队列 private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>(); @Resource private DataSource dataSource;
-
编写主流程方法
// 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "", ""); int batchSize = 1000; try { connector.connect(); connector.subscribe("cancal_mysql//..*");//订阅cancal_mysql库下的全表 connector.rollback(); try { while (true) { //尝试从master那边拉去数据batchSize条记录,有多少取多少 Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { Thread.sleep(1000); } else { //数据处理 dataHandle(message.getEntries()); } connector.ack(batchId); //当队列里面堆积的sql大于一定数值的时候就模拟执行 if (SQL_QUEUE.size() >= 1) { executeQueueSql(); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } finally { connector.disconnect(); }
-
数据处理方法
/** * 数据处理 * * @param entrys */ private void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException { for (CanalEntry.Entry entry : entrys) { if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); CanalEntry.EventType eventType = rowChange.getEventType(); if (eventType == CanalEntry.EventType.DELETE) { saveDeleteSql(entry); } else if (eventType == CanalEntry.EventType.UPDATE) { saveUpdateSql(entry); } else if (eventType == CanalEntry.EventType.INSERT) { saveInsertSql(entry); } } } }
-
解析sql并保存sql语句方法
-
删除(只做了简单的语句删除,下面类似)
/** * 保存删除语句 * * @param entry */ private void saveDeleteSql(CanalEntry.Entry entry) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> columnList = rowData.getBeforeColumnsList(); StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where "); for (CanalEntry.Column column : columnList) { if (column.getIsKey()) { //暂时只支持单一主键 if(column.getMysqlType().contains("varchar")){ sql.append(column.getName() + "='" + column.getValue()+"'"); }else { sql.append(column.getName() + "=" + column.getValue()); } break; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } }
-
新增
/** * 保存插入语句 * * @param entry */ private void saveInsertSql(CanalEntry.Entry entry) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> columnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " ("); for (int i = 0; i < columnList.size(); i++) { sql.append(columnList.get(i).getName()); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(") VALUES ("); for (int i = 0; i < columnList.size(); i++) { sql.append("'" + columnList.get(i).getValue() + "'"); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(")"); SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } }
-
修改
/** * 保存更新语句 * * @param entry */ private void saveUpdateSql(CanalEntry.Entry entry) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> newColumnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set "); for (int i = 0; i < newColumnList.size(); i++) { sql.append(" " + newColumnList.get(i).getName() + " = '" + newColumnList.get(i).getValue() + "'"); if (i != newColumnList.size() - 1) { sql.append(","); } } sql.append(" where "); List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : oldColumnList) { if (column.getIsKey()) { //暂时只支持单一主键 if(column.getMysqlType().contains("varchar")){ sql.append(column.getName() + "='" + column.getValue()+"'"); }else { sql.append(column.getName() + "=" + column.getValue()); } break; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } }
-
-
jdbc操作
/** * 入库 * @param sql */ public void execute(String sql) { Connection con = null; try { if(null == sql) return; con = dataSource.getConnection(); QueryRunner qr = new QueryRunner(); int row = qr.update(con, sql); System.out.println("update: "+ row); } catch (SQLException e) { e.printStackTrace(); } finally { DbUtils.closeQuietly(con); } }
4、效果
运行sql语句:
INSERT INTO `tb_commodity_info` ( `id`, `commodity_name`, `commodity_price`, `number`, `description` )
VALUES
( '030acbd3b71011ecb9760242ac110005', '测试0001', '5.88', 11, '描述信息0001' );
执行结果:
库1:
库2:
二、使用canal.adapter来做实现
上面的代码初步可以实现一般情况下的数据迁移,接下来我们来实现一下给予adapter的方式
1、安装canal.adapter
-
先从github上将adapter下载下来
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.adapter-1.1.5-SNAPSHOT.tar.gz
-
解压
mkdir /tmp/canal_adapter
tar zxvf canal.adapter-1.1.5.tar.gz -C /tmp/canal_adapter
- 修改配置文件conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
#kafka.bootstrap.servers: 127.0.0.1:9092
#kafka.enable.auto.commit: false
#kafka.auto.commit.interval.ms: 1000
#kafka.auto.offset.reset: latest
#kafka.request.timeout.ms: 40000
# kafka.session.timeout.ms: 30000
# kafka.isolation.level: read_committed
# kafka.max.poll.records: 1000
# rocketMQ consumer
# rocketmq.namespace:
# rocketmq.namesrv.addr: 127.0.0.1:9876
# rocketmq.batch.size: 1000
# rocketmq.enable.message.trace: false
# rocketmq.customized.trace.topic:
# rocketmq.access.channel:
# rocketmq.subscribe.filter:
# rabbitMQ consumer
# rabbitmq.host:
# rabbitmq.virtual.host:
# rabbitmq.username:
# rabbitmq.password:
# rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://源数据库ip:端口/cancal_mysql?useUnicode=true
username: root
password: root
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
# - name: logger
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://127.0.0.1:3306/cancal_mysql?useUnicode=true
jdbc.username: root
jdbc.password: 603409875
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
# - name: es
# hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
# properties:
# mode: transport # or rest
# # security.auth: test:123456 # only used for rest mode
# cluster.name: elasticsearch
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
-
修改conf/rdb/tb_commodity_info.yml
修改需要监听的表名为yml,若监听多个表可以生产多个yml
mv mytest_user.yml tb_commodity_info.yml
在修改配置文件
dataSourceKey: defaultDS destination: example groupId: g1 outerAdapterKey: mysql1 concurrent: true dbMapping: database: cancal_mysql table: tb_commodity_info targetTable: tb_commodity_info targetPk: id: id mapAll: true # targetColumns: # id: # name: # role_id: # c_time: # test1: # etlCondition: "where c_time>={}" commitBatch: 3000 # 批量提交的大小 ## Mirror schema synchronize config #dataSourceKey: defaultDS #destination: example #groupId: g1 #outerAdapterKey: mysql1 #concurrent: true #dbMapping: # mirrorDb: true # database: mytest
-
注意:由于我存的是mysql8,所以需要在lib中加入8的mysql-connector-java-8.0.24.jar
-
执行启动命令
sh bin/startup.sh
不出意外的果然出意外了
由于出现了意外,所以我们吧源码拉下来跑一遍后,更换了上面的配置等问题,终于成功了
三、源码解析
1、拉取源码到本地
git clone https://github.com/alibaba/canal.git
2、打开项目
3、修改配置
adapter项目启动入口是client-adapter包下的launcher模块
修改配置文件:application.yml与rdb目录下的tb_commodity_info.yml
4、启动项目
启动类CanalAdapterApplication
5、记录问题
- spi找不到rdb扩展实现类
通过源码追溯
在启动时,依据配置需要加载rdb的spi,需要在之前加载logger,所以需要在配置中加入logger
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger #解决此bug问题
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://localhost:3306/cancal_mysql?serverTimezone=GMT%2B8
jdbc.username: root
jdbc.password: 603409875
- 解决了logger问题后还是有上面问题
这时候可以将client-adapter的maven进行install一遍,会出现不少缺jar包的方式,我执行时缺了俩个jar
将这俩个jar打包出来后,在install adapter就可以了
注意:若还不行,可以先将launcher package然后在执行
- mysql1的kay找不到bug
进入ConfigLoader&load(Properties envProperties)方法中,看到加载表映射关系,跟代码后发现,说没有映射文件
为了查这个问题,我又跑到spi加载文件的地方去查,查到加载的路径是:
canal/1.1.5/canal-canal-1.1.5/client-adapter/launcher/target/canal-adapter/plugin/client-adapter.rdb-1.1.5-jar-with-dependencies.jar
打开jar包,原来是之前没改的时候的jar,此时又去查询install了rdb的包,在重新pack一遍,终于可以运行了
原创文章,作者:6024010,如若转载,请注明出处:https://blog.ytso.com/244472.html