goldengate_mysql_kafka同步详解大数据

goldengate 实现mysql到kafka同步

Oracle GoldenGate 提供异构环境间事务数据的实时、低影响的捕获、路由、转换和交付

goldengate架构

图片名称

图片名称

goldengate相关概念

  • Manager进程是GoldenGate的控制进程,运行在源端和目标端上。它主要作用有以下几个方面:启动、监控、重启Goldengate的其他进程,报告错误及事件,分配数据存储空间,发布阀值报告等。在目标端和源端有且只有一个manager进程
  • Extract运行在数据库源端,负责从源端数据表或者日志中捕获数据
  • Data Pump进程运行在数据库源端,其作用是将源端产生的本地trail文件,把trail以数据块的形式通过TCP/IP 协议发送到目标端。
  • Collector进程与Data Pump进程对应 的叫Server Collector进程,这个进程不需要引起我的关注,因为在实际操作过程中,无需我们对其进行任何配置,所以对我们来说它是透明的。它运行在目标端,其 任务就是把Extract/Pump投递过来的数据重新组装成远程ttrail文件。
  • Replicat进程,通常我们也把它叫做应用进程。运行在目标端,是数据传递的最后一站,负责读取目标端trail文件中的内容。

  • 关于OGG的Trail文件

    • 为了更有效、更安全的把数据库事务信息从源端投递到目标端。GoldenGate引进trail文件的概念。前面提到extract抽取完数据以 后 Goldengate会将抽取的事务信息转化为一种GoldenGate专有格式的文件。然后pump负责把源端的trail文件投递到目标端,所以源、 目标两端都会存在这种文件。
    • trail文件存在的目的旨在防止单点故障,将事务信息持久化,并且使用checkpoint机制来记录其读写位置,如果故障发生,则数据可以根据checkpoint记录的位置来重传 。

系统环境配置(源端和目标端)

  • JDK 版本

    jdk1.8
  • 添加环境变量

    export GGS_HOME=/opt/ggs 
    export PATH=$PATH:$GGS_HOME

    本篇安装文档goldengate安装目录为:/opt/ggs

  • 环境

    yp-data02 (源端,mysql所在机器) 
     
    yp-data01 (目标端)
  • goldengate版本

    Oracle GoldenGate 12.3.0.1.1 for MySQL on Linux x86-64   (源端版本) 
     
    Oracle GoldenGate for Big Data 12.3.1.1.1 on Linux x86-64 (目标端版本)
    • 下载地址

      http://www.oracle.com/technetwork/middleware/goldengate/downloads/index.html
  • 配置mysql binlog,配置后需重启mysql实例 (源端)

     
    # vim /etc/my.cnf 
     
     
    log-bin=mysql-bin 
    binlog_format=row 
    server-id=1
  • mysql 创建测试表(源端数据库)

    CREATE TABLE `test`.`wms_test` ( 
    `id` int(11) NOT NULL AUTO_INCREMENT, 
    `first_name` varchar(60) DEFAULT NULL, 
    `last_name` varchar(60) DEFAULT NULL, 
    `sex` varchar(45) DEFAULT NULL, 
    `address` varchar(200) DEFAULT NULL, 
    `flag` int(11) DEFAULT NULL, 
    PRIMARY KEY (`id`) 
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8
  • kafka环境(目标端)

源端配置(mysql)

  • 配置命令

    $GGS_HOME/ggsci
  • mysql用户登录

    dblogin sourcedb test@localhost:3306,userid root,password 123456  
  • 配置manager

    • CREATE SUBDIRS 创建目录

    • 配置命令

       
      # edit param mgr 
       
      port 17809 
      dynamicportlist 17800-18000 
      purgeoldextracts ./dirdat/*,usecheckpoints, minkeepdays 7
    • 启动mgr

       
      #  start mgr 
       
      查看启动进程 
       
      #  info all 
      
  • 配置extract

     # edit param ext_wpkg   
     
    extract ext_wpkg 
    setenv (MYSQL_HOME="/var/lib/mysql") 
    tranlogoptions altlogdest /var/lib/mysql/mysql-bin.index 
    dboptions host localhost,connectionport 3306 
    sourcedb test, userid root,password 123456 
    exttrail /opt/ggs/dirdat/W3 
    dynamicresolution 
    gettruncates 
    GETUPDATEBEFORES 
    NOCOMPRESSDELETES 
    NOCOMPRESSUPDATES 
    table test.wms_test;
     
    #      ADD EXTRACT ext_wpkg, tranlog,begin now 
     
     
    #      ADD EXTTRAIL /opt/ggs/dirdat/W3, EXTRACT ext_wpkg 
    
  • 配置pump

     
    # edit param pum_wpkg   
     
     
    extract pum_wpkg 
    rmthost yp-data01,mgrport 17809 
    rmttrail /opt/ggs/dirdat/W3 
    passthru 
    gettruncates 
    table test.wms_test;
     # ADD EXTRACT pum_wpkg,EXTTRAILSOURCE /opt/ggs/dirdat/W3; 
     # ADD RMTTRAIL /opt/ggs/dirdat/W3, EXTRACT pum_wpkg

备注:/opt/ggs/dirdat目标端ogg路径数据存放路径

  • 配置defgen

     
    # edit param defgen_wpkg 
     
    defsfile /opt/ggs/dirdef/defgen_wpkg.prm 
    sourcedb [email protected]:3306,userid root,password 123456 
    table test.wms_entry_warehouse_wpkg; 
    

    备注:用于生成表字段映射

  • 生成defgen表字段映射
    进入ogg根目录

    ./defgen paramfile /opt/ggs/dirprm/defgen_wpkg.prm

    备注:拷贝dirdef/defgen_wpkg.prm文件到目标端dirdef/目录下

  • 启动extract和pump

     
    #      start ext_wpkg 
     
     
    #      start pum_wpkg 
     
     
    #      info all 
    
  • 查看EXTRACT进程统计信息

     
    # stats EXT_WPKG 
     
     
     
    Sending STATS request to EXTRACT EXT_WPKG ... 
     
    Start of Statistics at 2017-12-12 14:55:43. 
     
    Output to /opt/ggs/dirdat/W3: 
     
    Extracting from test.wms_test to test.wms_test: 
     
    *** Total statistics since 2017-12-12 11:08:40 *** 
        Total inserts                                  1.00 
        Total updates                                  1.00 
        Total befores                                  1.00 
        Total deletes                                  1.00 
        Total discards                                 0.00 
        Total operations                               3.00 
     
    *** Daily statistics since 2017-12-12 11:08:40 *** 
        Total inserts                                  1.00 
        Total updates                                  1.00 
        Total befores                                  1.00 
        Total deletes                                  1.00 
        Total discards                                 0.00 
        Total operations                               3.00 
     
    
  • 查看PUMP进程统计信息

    Sending STATS request to EXTRACT PMP ... 
     
    Start of Statistics at 2017-12-12 15:00:05. 
     
    Output to /opt/ggs/dirdat/W3: 
     
    Extracting from test.wms_test to test.wms_test: 
     
    *** Total statistics since 2017-12-12 11:08:43 *** 
        Total inserts                                  1.00 
        Total updates                                  1.00 
        Total befores                                  1.00 
        Total deletes                                  1.00 
        Total discards                                 0.00 
        Total operations                               3.00 
     
    *** Daily statistics since 2017-12-12 11:08:43 *** 
        Total inserts                                  1.00 
        Total updates                                  1.00 
        Total befores                                  1.00 
        Total deletes                                  1.00 
        Total discards                                 0.00 
        Total operations                               3.00 
    

goldengate配置

  • 登录ggs

    $GGS_HOME/ggsci
  • 创建目录

目标端配置

  • $GGS_HOME/ggsci

  • CREATE SUBDIRS 创建目录

  • 配置mgr

     
    # edit param mgr  
     
     
    PORT 17809 
    DYNAMICPORTLIST 17810-17909 
    AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 
    PURGEOLDEXTRACTS /opt/ggs/dirdat/*,usecheckpoints, minkeepdays 3 
    
  • 配置REPLICAT

     
    # edit param REP_WPKG 
     
     
    REPLICAT rep_wpkg 
    TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props 
    REPORTCOUNT EVERY 1 MINUTES, RATE 
    GROUPTRANSOPS 10000 
    MAP test.*,TARGET test.*;
    add replicat rep_wpkg, exttrail /opt/ggs/dirdat/W3,begin now
  • 配置kafka

    • dirprm/kafka.props

      gg.handlerlist = kafkahandler 
      gg.handler.kafkahandler.type=kafka 
      gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties 
       
      #The following resolves the topic name using the short table name 
       
      gg.handler.kafkahandler.topicMappingTemplate=${tableName} 
       
      #The following selects the message key using the concatenated primary keys 
       
      gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys} 
      gg.handler.kafkahandler.format=json_row 
      gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic 
      gg.handler.kafkahandler.BlockingSend =false 
      gg.handler.kafkahandler.includeTokens=false 
      gg.handler.kafkahandler.mode=op 
       
       
      goldengate.userexit.timestamp=utc 
      goldengate.userexit.writers=javawriter 
      javawriter.stats.display=TRUE 
      javawriter.stats.full=TRUE 
       
      gg.log=log4j 
      gg.log.level=INFO 
       
      gg.report.time=30sec 
       
       
      #Sample gg.classpath for Apache Kafka 
       
      gg.classpath=dirprm/:/opt/kafka_2.10-0.10.0.1/libs/* 
       
      #Sample gg.classpath for HDP 
       
       
      #gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/* 
       
       
      javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=/opt/ggs/ggjava/ggjava.jar
      • gg.handler.kafkahandler.topicMappingTemplate kafka topic名称的映射,指定topic名称,也可以通过占位符的方式,例如${tableName},每一张表对应一个topic
      • gg.handler.kafkahandler.SchemaTopicName 表的Schema信息对应的topic名称
    • 配置kafka连接信息custom_kafka_producer.properties

      bootstrap.servers=localhost:9092 
      acks=1 
      reconnect.backoff.ms=1000 
       
      value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
      key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
       
      # 100KB per partition 
       
      batch.size=16384 
      linger.ms=0 
      
  • 启动mgr/replicat

    start mgr  
    start replicat REP_WPKG
  • 查看replicat统计信息

    “`
    # stats REP_WPKG

    Sending STATS request to REPLICAT REP_WPKG ... 
     
    Start of Statistics at 2017-12-12 15:02:14. 
     
    Replicating from test.wms_test to test.wms_test: 
     
    *** Total statistics since 2017-12-12 11:10:41 *** 
        Total inserts                                  1.00 
        Total updates                                  1.00 
        Total deletes                                  1.00 
        Total discards                                 0.00 
        Total operations                               3.00 
     
    *** Daily statistics since 2017-12-12 11:10:41 *** 
        Total inserts                                  1.00 
        Total updates                                  1.00 
        Total deletes                                  1.00 
        Total discards                                 0.00 
        Total operations                               3.00 
    

    “`

  • 启动kafka查看消息

    kafka上会创建两个topic

    • mySchemaTopic 接收Schema信息的topic

      • 从kafka接收Schema信息
       # kafka-console-consumer.sh --zookeeper localhost:2181 --topic  mySchemaTopic  --from-beginning 
       
       { 
          "$schema":"http://json-schema.org/draft-04/schema#", 
          "title":"TEST.WMS_TEST", 
          "description":"JSON schema for table TEST.WMS_TEST", 
          "definitions":{ 
              "tokens":{ 
                  "type":"object", 
                  "description":"Token keys and values are free form key value pairs.", 
                  "properties":{ 
       
                  }, 
                  "additionalProperties":true 
              } 
          }, 
          "type":"object", 
          "properties":{ 
              "table":{ 
                  "description":"The fully qualified table name", 
                  "type":"string" 
              }, 
              "op_type":{ 
                  "description":"The operation type", 
                  "type":"string" 
              }, 
              "op_ts":{ 
                  "description":"The operation timestamp", 
                  "type":"string" 
              }, 
              "current_ts":{ 
                  "description":"The current processing timestamp", 
                  "type":"string" 
              }, 
              "pos":{ 
                  "description":"The position of the operation in the data source", 
                  "type":"string" 
              }, 
              "primary_keys":{ 
                  "description":"Array of the primary key column names.", 
                  "type":"array", 
                  "items":{ 
                      "type":"string" 
                  }, 
                  "minItems":0, 
                  "uniqueItems":true 
              }, 
              "tokens":{ 
                  "$ref":"#/definitions/tokens" 
              }, 
              "id":{ 
                  "type":[ 
                      "integer", 
                      "null" 
                  ] 
              }, 
              "first_name":{ 
                  "type":[ 
                      "string", 
                      "null" 
                  ] 
              }, 
              "last_name":{ 
                  "type":[ 
                      "string", 
                      "null" 
                  ] 
              }, 
              "sex":{ 
                  "type":[ 
                      "string", 
                      "null" 
                  ] 
              }, 
              "address":{ 
                  "type":[ 
                      "string", 
                      "null" 
                  ] 
              }, 
              "flag":{ 
                  "type":[ 
                      "integer", 
                      "null" 
                  ] 
              } 
          }, 
          "required":[ 
              "table", 
              "op_type", 
              "op_ts", 
              "current_ts", 
              "pos" 
          ], 
          "additionalProperties":false 
      }
    • wms_test 每个table对应一个topic,也可以指定多个table对应一个topic

      • 从topic读取事务日志

        kafka-console-consumer.sh --zookeeper localhost:2181 --topic wms_test  --from-beginning
        • 新增一条记录
        INSERT INTO `test`.`wms_test` (`first_name`, `last_name`, `sex`, `address`, `flag`) VALUES ('a', 'b', 'c', 'd', '1'); 
         
         
         
        #kafka接收到消息格式 
         
         
        { 
            "table":"TEST.WMS_TEST", 
            "op_type":"I", 
            "op_ts":"2017-12-12 09:47:44.349517", 
            "current_ts":"2017-12-12T17:47:50.415000", 
            "pos":"00000000000000003880", 
            "id":16, 
            "first_name":"a", 
            "last_name":"b", 
            "sex":"c", 
            "address":"d", 
            "flag":1 
        } 
        
        • 修改一条记录
        UPDATE `test`.`wms_test` SET `flag`='2' WHERE `id`='16'; 
         
         
        #kafka接收到消息格式 
         
        { 
            "table":"TEST.WMS_TEST", 
            "op_type":"U", 
            "op_ts":"2017-12-12 09:49:27.349051", 
            "current_ts":"2017-12-12T17:49:32.457000", 
            "pos":"00000000000000004268", 
            "id":16, 
            "first_name":"a", 
            "last_name":"b", 
            "sex":"c", 
            "address":"d", 
            "flag":2 
        } 
        
        • 删除一条记录
         
        # DELETE FROM `test`.`wms_test` WHERE `id`='16'; 
         
         
         
        #kafka接收到消息格式 
         
        { 
            "table":"TEST.WMS_TEST", 
            "op_type":"D", 
            "op_ts":"2017-12-12 09:51:00.348680", 
            "current_ts":"2017-12-12T17:51:06.491000", 
            "pos":"00000000000000004376", 
            "id":16, 
            "first_name":"a", 
            "last_name":"b", 
            "sex":"c", 
            "address":"d", 
            "flag":2 
        } 
        

问题列表

  • Error loading Java VM runtime library: (2 No such file or directory)

    • 解决方式

      在环境变量中添加如下LD_LIBRARY_PATH

      export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so
  • db2安装报错 /ggsci: error while loading shared libraries: libdb2.so.1: cannot open shared object file: No such file or directory

    • 解决方式

    添加db2的lib依赖

    export LD_LIBRARY_PATH=$DB2_HOME/lib64/:$LD_LIBRARY_PATH 
    

kafka常用命令

  • 启动服务

    zookeeper-server-start.sh config/zookeeper.properties &
  • 启动Kafka

    bin/kafka-server-start.sh config/server.properties &
  • 创建队列

    kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  • 运行producer

    in/kafka-console-producer.sh --broker-list localhost:9092 --topic test  
  • 运行consumer

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning  
  • 查看创建的topic

    kafka-topics.sh --list --zookeeper localhost:2181

参考资料

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/9551.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论