我百度了一下 Canal,发现与 Canal 相关的技术文章并不多,再加上我上一篇文章《阿里 canal 内存溢出 Java heap space 问题解决》中走入的误区,导致了我想要写一篇关于 Canal 教程的文章。所以便有了本文,下面就和我一起来看看如何使用 canal 吧。

根据 https://github.com/alibaba/canal 上的原理解释,我们知道 canal 会模拟 mysql slave 的交互协议,伪装自己为 mysql slave,然后向 mysql master 发送 dump 协议。
mysql master 收到 dump 请求,开始推送 binary log 给 slave(也就是 canal),然后 canal 解析 binary log 对象(原始为 byte流)。
经 canal 解析过的对象,我们使用起来就非常的方便了。
再根据 https://github.com/alibaba/canal/releases 提供的版本信息,你会发现 canal 其实相当于一个中间件,专门用来解析 MySQL 的 binlog 日志。canal 解析好了之后,会封装成一个数据对象,通过 protobuf3.0 协议进行交互,让 canal 客户端进行消费。
根据上面的解释,以及 canal 提供的版本信息,我们在使用 canal 的时候,首选要安装一个 canal.deployer-1.1.2.tar.gz 进行解析 MySQL 的 binlog 日志。

下载后,复制 canal.deployer-1.1.2.tar.gz 到 MySQL 主机上,比如放在 /var/xttblog/otter 目录下。然后依次执行下面的命令:
mkdir canal cd canal tar -zxvf ../canal.deployer-1.1.2.tar.gz
然后修改 canal 的配置文件 vi conf/example/instance.properties。

这三项改成你自己的,比如我的配置如下:
canal.instance.dbUsername=xttblog canal.instance.dbPassword=xttblog canal.instance.connectionCharset = UTF-8 canal.instance.defaultDatabaseName =xttblog
然后保存并退出。(VI 模式下,按 Esc 输入 :wq 回车退出。)
接着,我们检查一下 MySQL 的配置。确定版本和是否开启了 binlog 日志,以及日志格式。
show variables like 'binlog_format'; show variables like 'log_bin'; select version();
canal 支持 binlog 格式为 ROW 的模式。如果你没开启 binlog,并且格式是非 row 的,建议修改一下 mysql 的配置文件。
执行 mysql –help | grep my.cnf 找到 mysql 的 my.cnf 文件。
执行 vi /etc/my.cnf 命令。添加下面 3 个配置。
log-bin=mysql-bin #添加这一行就ok binlog-format=ROW #选择row模式 server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
然后保存并退出。(VI 模式下,按 Esc 输入 :wq 回车退出。)
接着执行 sudo service mysqld restart 重启 MySQL。
需要注意的是你的 mysql 用户,必须要有 REPLICATION SLAVE 权限。该权限授予 slave 服务器以该账户连接 master 后可以执行 replicate 操作的权利。
如果没有权限,则使用 root 账户登录进 MySQL,执行下面的语句,创建用户,分配权限。
CREATE USER xttblog IDENTIFIED BY ‘xttblog’; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘‘xttblog’’@’%’; FLUSH PRIVILEGES;
MySQL 启动后,就可以开启 canal 服务了。
/var/xttblog/otter/canal/bin/startup.sh
开启后,观察 canal 服务的日志,确保服务正常。
tail -n 200 /var/xttblog/otter/canal/logs/canal/canal.log

确定没有问题后,开始编写我们的测试程序。
pom.xml 中导入下面的依赖。
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
然后编写下面的 Java 代码。
package com.xttblog.canal.test;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
/**
* CanalTest
* @author www.xttblog.com
* @date 2019/2/27 上午10:01
*/
public class CanalTest {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*//..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] /n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event " +
"has an error , data:" + entry.toString(), e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> " +
"binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() +
" : " + column.getValue() +
" update=" + column.getUpdated());
}
}
}
然后执行 main 方法。你再修改修改 MySQL 中的数据,你会发现所有改变都同步过来了。

: » 阿里巴巴开源的 Canal 使用教程
原创文章,作者:sunnyman218,如若转载,请注明出处:https://blog.ytso.com/tech/java/252040.html