用过 canal 的都知道,canal 现在坑非常的多,本文记录一个我遇到的坑之一!
canal-adapter目前支持rdb、es、kafka、hbase等多个目标端的同步,最近几篇我会记录一下我在使用adapter向这些目标端同步时,是如何解决es、hbase版本适配,添加部分个性化需求,以及如何处理一些我遇到各种问题。今天先来记录一下adapter同步mysql。
同步mysql
application.yml 配置文件内容如下所示:
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp # kafka rocketMQ
canalServerHost: 127.0.0.1:11111
# zookeeperHosts: slave1:2181
# mqServers: 127.0.0.1:9092 #or rocketmq
# flatMessage: true
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
username:
password:
vhost:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
username: root
password: 121212
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: rdb
# 在所有目标端的同步中key是一个必须要配置的参数,不配置可能会出现数据丢失的问题
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
jdbc.username: root
jdbc.password: 121212
conf/rdb下配置同步表信息。
dataSourceKey: defaultDS
destination: example
groupId: ''
# 在所有目标端的同步中outerAdapterKey是一个必须要配置的参数,不配置可能会出现数据丢失的问题
# 对应application.yml中key字段
outerAdapterKey: mysql1
concurrent: false
dbMapping:
commitBatch: 5000
etlCondition: null
mirrorDb: false
readBatch: 5000
database: mytest # 源数据源的database/shcema
table: tb1 # 源数据源表名
targetTable: mytest2.tb1 # 目标数据源的库名.表名
targetPk: # 主键映射
ID: ID # 如果是复合主键可以换行映射多个
mapAll: true
问题
在同步过程中,出现了insert成功,delete和update日志反馈成功,但实际数据没有同步的现象,查看同步源码。
com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService.java
for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
//首先注意映射字段冒号前是目标字段名,冒号后是源端字段名
String targetColumnName = entry.getKey();
String srcColumnName = entry.getValue();
//如果未配置冒号后字段,将其处理成与冒号前一致
if (srcColumnName == null) {
srcColumnName = Util.cleanColumn(targetColumnName);
}
Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
if (type == null) {
throw new RuntimeException("Target column: " + targetColumnName + " not matched");
}
//主要问题出在这里
Object value = data.get(srcColumnName);
BatchExecutor.setValue(values, type, value);
}
这段代码,能够看出,同步过程中更新的value值是存储在data这个Map结构中的,所以这就要求srcColumnName这个字段的值要与源端数据表的字段名大小写完全一致(即使数据库本身对于字段大小写是不敏感的)
举例
源端tb有ID、id1、Id2三列
目标端tb2有id、ID1、id2三列
其实在数据查询过程中,由于数据库对字段大小写不敏感,所有无论怎么写都是没问题的,但在配置同步表过程中,他们的表现就有所不同。
targetPk: # 主键映射
ID: ID
targetPk: # 主键映射
ID:
targetPk: # 主键映射
id: ID
#以上几种写法,最后解析出来的srcColumnName 均为ID,所以可以从有binlog解析出来的数据转换得到的源端Map类型的data中获取到数据
targetPk: # 主键映射
id:
#类似这种写法,解析出的srcColumnName 均为id,
#而Map类型中data中只包含ID这个key,而不包含id这个key,
#所以就取不到相应的值,导致需要主键操作的update和delete语句执行无效
这是我在同步rdb时遇到的需要注意的地方,当然也可以根据需要将这部分代码改造一下,这里就不作描述了,代码具体位置已给出。
如果你也遇到了一些坑,欢迎加我微信号:canal,拉你进群学 canal。推荐阅读:https://mp.weixin.qq.com/s/Qerwilr7D8HDdzd1-jd_4w
: » canal-adapter趟坑记录
原创文章,作者:Carrie001128,如若转载,请注明出处:https://blog.ytso.com/252208.html