阿里巴巴开源的 Canal 使用教程

我百度了一下 Canal,发现与 Canal 相关的技术文章并不多,再加上我上一篇文章《阿里 canal 内存溢出 Java heap space 问题解决》中走入的误区,导致了我想要写一篇关于 Canal 教程的文章。所以便有了本文,下面就和我一起来看看如何使用 canal 吧。

Canal 教程

根据 https://github.com/alibaba/canal 上的原理解释,我们知道 canal 会模拟 mysql slave 的交互协议,伪装自己为 mysql slave,然后向 mysql master 发送 dump 协议。

mysql master 收到 dump 请求,开始推送 binary log 给 slave(也就是 canal),然后 canal 解析 binary log 对象(原始为 byte流)。

经 canal 解析过的对象,我们使用起来就非常的方便了。

再根据 https://github.com/alibaba/canal/releases 提供的版本信息,你会发现 canal 其实相当于一个中间件,专门用来解析 MySQL 的 binlog 日志。canal 解析好了之后,会封装成一个数据对象,通过 protobuf3.0 协议进行交互,让 canal 客户端进行消费。

根据上面的解释,以及 canal 提供的版本信息,我们在使用 canal 的时候,首选要安装一个 canal.deployer-1.1.2.tar.gz 进行解析 MySQL 的 binlog 日志。

Canal 部署安装

下载后,复制 canal.deployer-1.1.2.tar.gz 到 MySQL 主机上,比如放在 /var/xttblog/otter 目录下。然后依次执行下面的命令:

mkdir canal
cd canal
tar -zxvf ../canal.deployer-1.1.2.tar.gz

然后修改 canal 的配置文件 vi conf/example/instance.properties。

instance.properties 配置

这三项改成你自己的,比如我的配置如下:

canal.instance.dbUsername=xttblog
canal.instance.dbPassword=xttblog
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName =xttblog

然后保存并退出。(VI 模式下,按 Esc 输入 :wq 回车退出。)

接着,我们检查一下 MySQL 的配置。确定版本和是否开启了 binlog 日志,以及日志格式。

show variables like 'binlog_format';
show variables like 'log_bin';
select version();

canal 支持 binlog 格式为 ROW 的模式。如果你没开启 binlog,并且格式是非 row 的,建议修改一下 mysql 的配置文件。

执行 mysql –help | grep my.cnf 找到 mysql 的 my.cnf 文件。

执行 vi /etc/my.cnf 命令。添加下面 3 个配置。

log-bin=mysql-bin #添加这一行就ok 
binlog-format=ROW #选择row模式 
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

然后保存并退出。(VI 模式下,按 Esc 输入 :wq 回车退出。)

接着执行 sudo service mysqld restart 重启 MySQL。

需要注意的是你的 mysql 用户,必须要有 REPLICATION SLAVE 权限。该权限授予 slave 服务器以该账户连接 master 后可以执行 replicate 操作的权利。

如果没有权限,则使用 root 账户登录进 MySQL,执行下面的语句,创建用户,分配权限。

CREATE USER xttblog IDENTIFIED BY ‘xttblog’; 
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘‘xttblog’’@’%’;
FLUSH PRIVILEGES;

MySQL 启动后,就可以开启 canal 服务了。

/var/xttblog/otter/canal/bin/startup.sh

开启后,观察 canal 服务的日志,确保服务正常。

tail -n 200 /var/xttblog/otter/canal/logs/canal/canal.log

查看 canal 的日志

确定没有问题后,开始编写我们的测试程序。

pom.xml 中导入下面的依赖。

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.2</version>
</dependency>

然后编写下面的 Java 代码。

package com.xttblog.canal.test;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
/**
 * CanalTest
 * @author www.xttblog.com
 * @date 2019/2/27 上午10:01
 */
public class CanalTest {
    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*//..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] /n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }
    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN 
                    || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event " +
                        "has an error , data:" + entry.toString(), e);
            }
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; " +
                            "binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + 
                    " : " + column.getValue() + 
                    "    update=" + column.getUpdated());
        }
    }
}

然后执行 main 方法。你再修改修改 MySQL 中的数据,你会发现所有改变都同步过来了。

阿里巴巴开源的 Canal 使用教程

: » 阿里巴巴开源的 Canal 使用教程

原创文章,作者:sunnyman218,如若转载,请注明出处:https://blog.ytso.com/tech/java/252040.html

(0)
上一篇 2022年5月3日 23:15
下一篇 2022年5月3日 23:19

相关推荐

发表回复

登录后才能评论