Canal-保存mysql篇


Canal-保存mysql篇

一、java实现

先用java代码手写一遍,方便后续业务逻辑理解

1、maven配置:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.24</version>
        </dependency>

        <dependency>
            <groupId>commons-dbutils</groupId>
            <artifactId>commons-dbutils</artifactId>
            <version>1.6</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>

2、application.properties配置

# 服务端口
server.port=10000
# 服务名
spring.application.name=canal-client

# 环境设置:dev、test、prod
spring.profiles.active=dev

# mysql数据库连接
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/cancal_mysql?serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=603409875

3、代码实现

  • 首先声明一个队列来接收sql语句

    		//sql队列
        private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
    		@Resource
        private DataSource dataSource;
    
  • 编写主流程方法

    // 创建链接
            CanalConnector connector = CanalConnectors.newSingleConnector(
                    new InetSocketAddress("127.0.0.1",
                    11111), "example", "", "");
            int batchSize = 1000;
            try {
                connector.connect();
                connector.subscribe("cancal_mysql//..*");//订阅cancal_mysql库下的全表
                connector.rollback();
                try {
                    while (true) {
                        //尝试从master那边拉去数据batchSize条记录,有多少取多少
                        Message message = connector.getWithoutAck(batchSize);
                        long batchId = message.getId();
                        int size = message.getEntries().size();
                        if (batchId == -1 || size == 0) {
                            Thread.sleep(1000);
                        } else {
                            //数据处理
                            dataHandle(message.getEntries());
                        }
                        connector.ack(batchId);
    
                        //当队列里面堆积的sql大于一定数值的时候就模拟执行
                        if (SQL_QUEUE.size() >= 1) {
                            executeQueueSql();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (InvalidProtocolBufferException e) {
                    e.printStackTrace();
                }
            } finally {
                connector.disconnect();
            }
    
  • 数据处理方法

     /**
         * 数据处理
         *
         * @param entrys
         */
        private void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {
            for (CanalEntry.Entry entry : entrys) {
                if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) {
                    CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    CanalEntry.EventType eventType = rowChange.getEventType();
                    if (eventType == CanalEntry.EventType.DELETE) {
                        saveDeleteSql(entry);
                    } else if (eventType == CanalEntry.EventType.UPDATE) {
                        saveUpdateSql(entry);
                    } else if (eventType == CanalEntry.EventType.INSERT) {
                        saveInsertSql(entry);
                    }
                }
            }
        }
    
  • 解析sql并保存sql语句方法

    • 删除(只做了简单的语句删除,下面类似)

      /**
           * 保存删除语句
           *
           * @param entry
           */
          private void saveDeleteSql(CanalEntry.Entry entry) {
              try {
                  CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                  List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                  for (CanalEntry.RowData rowData : rowDatasList) {
                      List<CanalEntry.Column> columnList = rowData.getBeforeColumnsList();
                      StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
                      for (CanalEntry.Column column : columnList) {
                          if (column.getIsKey()) {
                              //暂时只支持单一主键
                              if(column.getMysqlType().contains("varchar")){
                                  sql.append(column.getName() + "='" + column.getValue()+"'");
                              }else {
                                  sql.append(column.getName() + "=" + column.getValue());
                              }
                              break;
                          }
                      }
                      SQL_QUEUE.add(sql.toString());
                  }
              } catch (InvalidProtocolBufferException e) {
                  e.printStackTrace();
              }
          }
      
    • 新增

      /**
           * 保存插入语句
           *
           * @param entry
           */
          private void saveInsertSql(CanalEntry.Entry entry) {
              try {
                  CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                  List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                  for (CanalEntry.RowData rowData : rowDatasList) {
                      List<CanalEntry.Column> columnList = rowData.getAfterColumnsList();
                      StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
                      for (int i = 0; i < columnList.size(); i++) {
                          sql.append(columnList.get(i).getName());
                          if (i != columnList.size() - 1) {
                              sql.append(",");
                          }
                      }
                      sql.append(") VALUES (");
                      for (int i = 0; i < columnList.size(); i++) {
                          sql.append("'" + columnList.get(i).getValue() + "'");
                          if (i != columnList.size() - 1) {
                              sql.append(",");
                          }
                      }
                      sql.append(")");
                      SQL_QUEUE.add(sql.toString());
                  }
              } catch (InvalidProtocolBufferException e) {
                  e.printStackTrace();
              }
          }
      
    • 修改

      /**
           * 保存更新语句
           *
           * @param entry
           */
          private void saveUpdateSql(CanalEntry.Entry entry) {
              try {
                  CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                  List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                  for (CanalEntry.RowData rowData : rowDatasList) {
                      List<CanalEntry.Column> newColumnList = rowData.getAfterColumnsList();
                      StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
                      for (int i = 0; i < newColumnList.size(); i++) {
                          sql.append(" " + newColumnList.get(i).getName()
                                  + " = '" + newColumnList.get(i).getValue() + "'");
                          if (i != newColumnList.size() - 1) {
                              sql.append(",");
                          }
                      }
                      sql.append(" where ");
                      List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList();
                      for (CanalEntry.Column column : oldColumnList) {
                          if (column.getIsKey()) {
                              //暂时只支持单一主键
                              if(column.getMysqlType().contains("varchar")){
                                  sql.append(column.getName() + "='" + column.getValue()+"'");
                              }else {
                                  sql.append(column.getName() + "=" + column.getValue());
                              }
                              break;
                          }
                      }
                      SQL_QUEUE.add(sql.toString());
                  }
              } catch (InvalidProtocolBufferException e) {
                  e.printStackTrace();
              }
          }
      
  • jdbc操作

     /**
         * 入库
         * @param sql
         */
        public void execute(String sql) {
            Connection con = null;
            try {
                if(null == sql) return;
                con = dataSource.getConnection();
                QueryRunner qr = new QueryRunner();
                int row = qr.update(con, sql);
                System.out.println("update: "+ row);
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                DbUtils.closeQuietly(con);
            }
        }
    

4、效果

运行sql语句:

INSERT INTO `tb_commodity_info` ( `id`, `commodity_name`, `commodity_price`, `number`, `description` )
VALUES
	( '030acbd3b71011ecb9760242ac110005', '测试0001', '5.88', 11, '描述信息0001' );

执行结果:

Canal-保存mysql篇

库1:

Canal-保存mysql篇

库2:

Canal-保存mysql篇

二、使用canal.adapter来做实现

上面的代码初步可以实现一般情况下的数据迁移,接下来我们来实现一下给予adapter的方式

1、安装canal.adapter

  • 先从github上将adapter下载下来

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.adapter-1.1.5-SNAPSHOT.tar.gz

  • 解压

mkdir /tmp/canal_adapter
tar zxvf canal.adapter-1.1.5.tar.gz  -C /tmp/canal_adapter
  • 修改配置文件conf/application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    #kafka.bootstrap.servers: 127.0.0.1:9092
    #kafka.enable.auto.commit: false
    #kafka.auto.commit.interval.ms: 1000
    #kafka.auto.offset.reset: latest
    #kafka.request.timeout.ms: 40000
#    kafka.session.timeout.ms: 30000
#    kafka.isolation.level: read_committed
#    kafka.max.poll.records: 1000
    # rocketMQ consumer
#    rocketmq.namespace:
#    rocketmq.namesrv.addr: 127.0.0.1:9876
#    rocketmq.batch.size: 1000
#    rocketmq.enable.message.trace: false
#    rocketmq.customized.trace.topic:
#    rocketmq.access.channel:
#    rocketmq.subscribe.filter:
    # rabbitMQ consumer
#    rabbitmq.host:
#    rabbitmq.virtual.host:
#    rabbitmq.username:
#    rabbitmq.password:
#    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://源数据库ip:端口/cancal_mysql?useUnicode=true
      username: root
      password: root
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
#      - name: logger
      - name: rdb
        key: mysql1
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://127.0.0.1:3306/cancal_mysql?useUnicode=true
          jdbc.username: root
          jdbc.password: 603409875
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
#      - name: es
#        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
#        properties:
#          mode: transport # or rest
#          # security.auth: test:123456 #  only used for rest mode
#          cluster.name: elasticsearch
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address
  • 修改conf/rdb/tb_commodity_info.yml

    修改需要监听的表名为yml,若监听多个表可以生产多个yml

    mv mytest_user.yml tb_commodity_info.yml
    

    在修改配置文件

    dataSourceKey: defaultDS
    destination: example
    groupId: g1
    outerAdapterKey: mysql1
    concurrent: true
    dbMapping:
      database: cancal_mysql
      table: tb_commodity_info
      targetTable: tb_commodity_info
      targetPk:
        id: id
      mapAll: true
    #  targetColumns:
    #    id:
    #    name:
    #    role_id:
    #    c_time:
    #    test1:
    #  etlCondition: "where c_time>={}"
      commitBatch: 3000 # 批量提交的大小
    
    
    ## Mirror schema synchronize config
    #dataSourceKey: defaultDS
    #destination: example
    #groupId: g1
    #outerAdapterKey: mysql1
    #concurrent: true
    #dbMapping:
    #  mirrorDb: true
    #  database: mytest
    
    
  • 注意:由于我存的是mysql8,所以需要在lib中加入8的mysql-connector-java-8.0.24.jar

  • 执行启动命令

sh bin/startup.sh

不出意外的果然出意外了

由于出现了意外,所以我们吧源码拉下来跑一遍后,更换了上面的配置等问题,终于成功了

三、源码解析

1、拉取源码到本地

git clone https://github.com/alibaba/canal.git

2、打开项目

Canal-保存mysql篇

3、修改配置

adapter项目启动入口是client-adapter包下的launcher模块

修改配置文件:application.ymlrdb目录下的tb_commodity_info.yml

4、启动项目

启动类CanalAdapterApplication

5、记录问题

  • spi找不到rdb扩展实现类

Canal-保存mysql篇

通过源码追溯

Canal-保存mysql篇

在启动时,依据配置需要加载rdb的spi,需要在之前加载logger,所以需要在配置中加入logger

canalAdapters:
    - instance: example # canal instance Name or mq topic name
      groups:
        - groupId: g1
          outerAdapters:
          - name: logger   #解决此bug问题
            - name: rdb
              key: mysql1
              properties:
                jdbc.driverClassName: com.mysql.jdbc.Driver
                jdbc.url: jdbc:mysql://localhost:3306/cancal_mysql?serverTimezone=GMT%2B8
                jdbc.username: root
                jdbc.password: 603409875
  • 解决了logger问题后还是有上面问题

这时候可以将client-adapter的maven进行install一遍,会出现不少缺jar包的方式,我执行时缺了俩个jar
Canal-保存mysql篇

将这俩个jar打包出来后,在install adapter就可以了

注意:若还不行,可以先将launcher package然后在执行

  • mysql1的kay找不到bug

进入ConfigLoader&load(Properties envProperties)方法中,看到加载表映射关系,跟代码后发现,说没有映射文件

为了查这个问题,我又跑到spi加载文件的地方去查,查到加载的路径是:

canal/1.1.5/canal-canal-1.1.5/client-adapter/launcher/target/canal-adapter/plugin/client-adapter.rdb-1.1.5-jar-with-dependencies.jar

打开jar包,原来是之前没改的时候的jar,此时又去查询install了rdb的包,在重新pack一遍,终于可以运行了

原创文章,作者:6024010,如若转载,请注明出处:https://blog.ytso.com/244472.html

(0)
上一篇 2022年4月17日
下一篇 2022年4月17日

相关推荐

发表回复

登录后才能评论