goldengate 实现mysql到kafka同步
Oracle GoldenGate 提供异构环境间事务数据的实时、低影响的捕获、路由、转换和交付
goldengate架构
goldengate相关概念
- Manager进程是GoldenGate的控制进程,运行在源端和目标端上。它主要作用有以下几个方面:启动、监控、重启Goldengate的其他进程,报告错误及事件,分配数据存储空间,发布阀值报告等。在目标端和源端有且只有一个manager进程
- Extract运行在数据库源端,负责从源端数据表或者日志中捕获数据
- Data Pump进程运行在数据库源端,其作用是将源端产生的本地trail文件,把trail以数据块的形式通过TCP/IP 协议发送到目标端。
- Collector进程与Data Pump进程对应 的叫Server Collector进程,这个进程不需要引起我的关注,因为在实际操作过程中,无需我们对其进行任何配置,所以对我们来说它是透明的。它运行在目标端,其 任务就是把Extract/Pump投递过来的数据重新组装成远程ttrail文件。
-
Replicat进程,通常我们也把它叫做应用进程。运行在目标端,是数据传递的最后一站,负责读取目标端trail文件中的内容。
-
关于OGG的Trail文件
- 为了更有效、更安全的把数据库事务信息从源端投递到目标端。GoldenGate引进trail文件的概念。前面提到extract抽取完数据以 后 Goldengate会将抽取的事务信息转化为一种GoldenGate专有格式的文件。然后pump负责把源端的trail文件投递到目标端,所以源、 目标两端都会存在这种文件。
- trail文件存在的目的旨在防止单点故障,将事务信息持久化,并且使用checkpoint机制来记录其读写位置,如果故障发生,则数据可以根据checkpoint记录的位置来重传 。
系统环境配置(源端和目标端)
-
JDK 版本
jdk1.8
-
添加环境变量
export GGS_HOME=/opt/ggs export PATH=$PATH:$GGS_HOME
本篇安装文档goldengate安装目录为:/opt/ggs
-
环境
yp-data02 (源端,mysql所在机器) yp-data01 (目标端)
-
goldengate版本
Oracle GoldenGate 12.3.0.1.1 for MySQL on Linux x86-64 (源端版本) Oracle GoldenGate for Big Data 12.3.1.1.1 on Linux x86-64 (目标端版本)
-
下载地址
http://www.oracle.com/technetwork/middleware/goldengate/downloads/index.html
-
-
配置mysql binlog,配置后需重启mysql实例 (源端)
# vim /etc/my.cnf log-bin=mysql-bin binlog_format=row server-id=1
-
mysql 创建测试表(源端数据库)
CREATE TABLE `test`.`wms_test` ( `id` int(11) NOT NULL AUTO_INCREMENT, `first_name` varchar(60) DEFAULT NULL, `last_name` varchar(60) DEFAULT NULL, `sex` varchar(45) DEFAULT NULL, `address` varchar(200) DEFAULT NULL, `flag` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8
-
kafka环境(目标端)
源端配置(mysql)
-
配置命令
$GGS_HOME/ggsci
-
mysql用户登录
dblogin sourcedb test@localhost:3306,userid root,password 123456
-
配置manager
-
CREATE SUBDIRS 创建目录
-
配置命令
# edit param mgr port 17809 dynamicportlist 17800-18000 purgeoldextracts ./dirdat/*,usecheckpoints, minkeepdays 7
-
启动mgr
# start mgr 查看启动进程 # info all
-
-
配置extract
# edit param ext_wpkg extract ext_wpkg setenv (MYSQL_HOME="/var/lib/mysql") tranlogoptions altlogdest /var/lib/mysql/mysql-bin.index dboptions host localhost,connectionport 3306 sourcedb test, userid root,password 123456 exttrail /opt/ggs/dirdat/W3 dynamicresolution gettruncates GETUPDATEBEFORES NOCOMPRESSDELETES NOCOMPRESSUPDATES table test.wms_test;
# ADD EXTRACT ext_wpkg, tranlog,begin now # ADD EXTTRAIL /opt/ggs/dirdat/W3, EXTRACT ext_wpkg
-
配置pump
# edit param pum_wpkg extract pum_wpkg rmthost yp-data01,mgrport 17809 rmttrail /opt/ggs/dirdat/W3 passthru gettruncates table test.wms_test;
# ADD EXTRACT pum_wpkg,EXTTRAILSOURCE /opt/ggs/dirdat/W3; # ADD RMTTRAIL /opt/ggs/dirdat/W3, EXTRACT pum_wpkg
备注:/opt/ggs/dirdat目标端ogg路径数据存放路径
-
配置defgen
# edit param defgen_wpkg defsfile /opt/ggs/dirdef/defgen_wpkg.prm sourcedb [email protected]:3306,userid root,password 123456 table test.wms_entry_warehouse_wpkg;
备注:用于生成表字段映射
-
生成defgen表字段映射
进入ogg根目录./defgen paramfile /opt/ggs/dirprm/defgen_wpkg.prm
备注:拷贝dirdef/defgen_wpkg.prm文件到目标端dirdef/目录下
-
启动extract和pump
# start ext_wpkg # start pum_wpkg # info all
-
查看EXTRACT进程统计信息
# stats EXT_WPKG Sending STATS request to EXTRACT EXT_WPKG ... Start of Statistics at 2017-12-12 14:55:43. Output to /opt/ggs/dirdat/W3: Extracting from test.wms_test to test.wms_test: *** Total statistics since 2017-12-12 11:08:40 *** Total inserts 1.00 Total updates 1.00 Total befores 1.00 Total deletes 1.00 Total discards 0.00 Total operations 3.00 *** Daily statistics since 2017-12-12 11:08:40 *** Total inserts 1.00 Total updates 1.00 Total befores 1.00 Total deletes 1.00 Total discards 0.00 Total operations 3.00
-
查看PUMP进程统计信息
Sending STATS request to EXTRACT PMP ... Start of Statistics at 2017-12-12 15:00:05. Output to /opt/ggs/dirdat/W3: Extracting from test.wms_test to test.wms_test: *** Total statistics since 2017-12-12 11:08:43 *** Total inserts 1.00 Total updates 1.00 Total befores 1.00 Total deletes 1.00 Total discards 0.00 Total operations 3.00 *** Daily statistics since 2017-12-12 11:08:43 *** Total inserts 1.00 Total updates 1.00 Total befores 1.00 Total deletes 1.00 Total discards 0.00 Total operations 3.00
goldengate配置
-
登录ggs
$GGS_HOME/ggsci
- 创建目录
目标端配置
-
$GGS_HOME/ggsci
-
CREATE SUBDIRS 创建目录
-
配置mgr
# edit param mgr PORT 17809 DYNAMICPORTLIST 17810-17909 AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 PURGEOLDEXTRACTS /opt/ggs/dirdat/*,usecheckpoints, minkeepdays 3
-
配置REPLICAT
# edit param REP_WPKG REPLICAT rep_wpkg TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP test.*,TARGET test.*;
add replicat rep_wpkg, exttrail /opt/ggs/dirdat/W3,begin now
-
配置kafka
-
dirprm/kafka.props
gg.handlerlist = kafkahandler gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties #The following resolves the topic name using the short table name gg.handler.kafkahandler.topicMappingTemplate=${tableName} #The following selects the message key using the concatenated primary keys gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys} gg.handler.kafkahandler.format=json_row gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic gg.handler.kafkahandler.BlockingSend =false gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode=op goldengate.userexit.timestamp=utc goldengate.userexit.writers=javawriter javawriter.stats.display=TRUE javawriter.stats.full=TRUE gg.log=log4j gg.log.level=INFO gg.report.time=30sec #Sample gg.classpath for Apache Kafka gg.classpath=dirprm/:/opt/kafka_2.10-0.10.0.1/libs/* #Sample gg.classpath for HDP #gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/* javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=/opt/ggs/ggjava/ggjava.jar
- gg.handler.kafkahandler.topicMappingTemplate kafka topic名称的映射,指定topic名称,也可以通过占位符的方式,例如${tableName},每一张表对应一个topic
- gg.handler.kafkahandler.SchemaTopicName 表的Schema信息对应的topic名称
-
配置kafka连接信息custom_kafka_producer.properties
bootstrap.servers=localhost:9092 acks=1 reconnect.backoff.ms=1000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer # 100KB per partition batch.size=16384 linger.ms=0
-
-
启动mgr/replicat
start mgr start replicat REP_WPKG
-
查看replicat统计信息
“`
# stats REP_WPKGSending STATS request to REPLICAT REP_WPKG ... Start of Statistics at 2017-12-12 15:02:14. Replicating from test.wms_test to test.wms_test: *** Total statistics since 2017-12-12 11:10:41 *** Total inserts 1.00 Total updates 1.00 Total deletes 1.00 Total discards 0.00 Total operations 3.00 *** Daily statistics since 2017-12-12 11:10:41 *** Total inserts 1.00 Total updates 1.00 Total deletes 1.00 Total discards 0.00 Total operations 3.00
“`
-
启动kafka查看消息
kafka上会创建两个topic
-
mySchemaTopic 接收Schema信息的topic
- 从kafka接收Schema信息
# kafka-console-consumer.sh --zookeeper localhost:2181 --topic mySchemaTopic --from-beginning { "$schema":"http://json-schema.org/draft-04/schema#", "title":"TEST.WMS_TEST", "description":"JSON schema for table TEST.WMS_TEST", "definitions":{ "tokens":{ "type":"object", "description":"Token keys and values are free form key value pairs.", "properties":{ }, "additionalProperties":true } }, "type":"object", "properties":{ "table":{ "description":"The fully qualified table name", "type":"string" }, "op_type":{ "description":"The operation type", "type":"string" }, "op_ts":{ "description":"The operation timestamp", "type":"string" }, "current_ts":{ "description":"The current processing timestamp", "type":"string" }, "pos":{ "description":"The position of the operation in the data source", "type":"string" }, "primary_keys":{ "description":"Array of the primary key column names.", "type":"array", "items":{ "type":"string" }, "minItems":0, "uniqueItems":true }, "tokens":{ "$ref":"#/definitions/tokens" }, "id":{ "type":[ "integer", "null" ] }, "first_name":{ "type":[ "string", "null" ] }, "last_name":{ "type":[ "string", "null" ] }, "sex":{ "type":[ "string", "null" ] }, "address":{ "type":[ "string", "null" ] }, "flag":{ "type":[ "integer", "null" ] } }, "required":[ "table", "op_type", "op_ts", "current_ts", "pos" ], "additionalProperties":false }
-
wms_test 每个table对应一个topic,也可以指定多个table对应一个topic
-
从topic读取事务日志
kafka-console-consumer.sh --zookeeper localhost:2181 --topic wms_test --from-beginning
- 新增一条记录
INSERT INTO `test`.`wms_test` (`first_name`, `last_name`, `sex`, `address`, `flag`) VALUES ('a', 'b', 'c', 'd', '1'); #kafka接收到消息格式 { "table":"TEST.WMS_TEST", "op_type":"I", "op_ts":"2017-12-12 09:47:44.349517", "current_ts":"2017-12-12T17:47:50.415000", "pos":"00000000000000003880", "id":16, "first_name":"a", "last_name":"b", "sex":"c", "address":"d", "flag":1 }
- 修改一条记录
UPDATE `test`.`wms_test` SET `flag`='2' WHERE `id`='16'; #kafka接收到消息格式 { "table":"TEST.WMS_TEST", "op_type":"U", "op_ts":"2017-12-12 09:49:27.349051", "current_ts":"2017-12-12T17:49:32.457000", "pos":"00000000000000004268", "id":16, "first_name":"a", "last_name":"b", "sex":"c", "address":"d", "flag":2 }
- 删除一条记录
# DELETE FROM `test`.`wms_test` WHERE `id`='16'; #kafka接收到消息格式 { "table":"TEST.WMS_TEST", "op_type":"D", "op_ts":"2017-12-12 09:51:00.348680", "current_ts":"2017-12-12T17:51:06.491000", "pos":"00000000000000004376", "id":16, "first_name":"a", "last_name":"b", "sex":"c", "address":"d", "flag":2 }
-
-
问题列表
-
Error loading Java VM runtime library: (2 No such file or directory)
-
解决方式
在环境变量中添加如下LD_LIBRARY_PATH
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so
-
-
db2安装报错 /ggsci: error while loading shared libraries: libdb2.so.1: cannot open shared object file: No such file or directory
- 解决方式
添加db2的lib依赖
export LD_LIBRARY_PATH=$DB2_HOME/lib64/:$LD_LIBRARY_PATH
kafka常用命令
-
启动服务
zookeeper-server-start.sh config/zookeeper.properties &
-
启动Kafka
bin/kafka-server-start.sh config/server.properties &
-
创建队列
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
-
运行producer
in/kafka-console-producer.sh --broker-list localhost:9092 --topic test
-
运行consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
-
查看创建的topic
kafka-topics.sh --list --zookeeper localhost:2181
参考资料
- Introducing the Java Adapter
- https://docs.oracle.com/goldengate/bd123110/gg-bd/GADBD/using-kafka-handler.htm#GADBD451
- 深入理解 Oracle GoldenGate 检查点机制
- GoldenGate常用操作
- Oracle goldengate 实现mysql到kafka同步配置
- https://blogs.oracle.com/dataintegration/goldengate-for-big-data-123-is-released
- http://dbaoracle4hire.blogspot.com/2017/08/nueva-version-de-oracle-goldengate-para.html
- Learn GoldenGate
- GoldenGate 基础架构
- Oracle GoldenGate for db2
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/9551.html