Cdc

阅读(37) 标签: debezium, cdc, mysql, postgresql,

1. CdcCli外部库提供Cdc外部库函数;外部库文件路径为:[安装目录]\esProc\extlib\CdcCli

2. CdcCli要求java环境为jre1.7及以上版本。如果使用自定义jre,那么需要在[安装目录]\esProc\bin\config.txt文件中配置java_home,否则此步可以省略。

3. 使用CdcCli外部库,需要搭配数据同步工具。

DebeziumCli目录下([安装目录]\esProc\extlib\DebeziumCli)提供数据同步工具,使用该工具可以通过Debezium将数据库中的数据增删改操作存储到复组表中(目前支持MySQLPostgreSQL);Debezium支持3.0.0及以上版本。

润乾外部库核心jar[安装目录]\esProc\extlib\CdcCli \scu-cdc-cli-2.10.jar, [安装目录]\esProc\extlib\DebeziumCli \scu-debezium-2.10.jar

除外部库核心jarDebeziumCli目录下还提供以下文件:

antlr4-runtime-4.10.1.jar

connect-api-3.8.0.jar

connect-json-3.8.0.jar

connect-runtime-3.8.0.jar

debezium-api-3.0.0.Final.jar

debezium-connector-binlog-3.0.0.Final.jar

debezium-connector-mysql-3.0.0.Final.jar

debezium-connector-postgres-3.0.0.Final.jar

debezium-core-3.0.0.Final.jar

debezium-ddl-parser-3.0.0.Final.jar

debezium-embedded-3.0.0.Final.jar

debezium-storage-file-3.0.0.Final.jar

debezium-storage-kafka-3.0.0.Final.jar

fastjson-1.2.76.jar

jackson-annotations-2.16.2.jar

jackson-core-2.16.2.jar

jackson-databind-2.16.2.jar

jackson-module-afterburner-2.16.2.jar

kafka-clients-3.8.0.jar

log4j-1.2.17.jar

mysql-binlog-connector-java-0.31.0.jar

mysql-connector-j-9.0.0.jar

postgresql-42.2.14.jar

slf4j-api-1.7.30.jar

slf4j-reload4j-1.7.36.jar

注:以上第三方依赖jar,外部库压缩包中默认已放置,用户可根据实际应用环境灵活使用。

另外存在以下文件:

log4j.properties

start_server.bat

4. 下面介绍数据同步工具的使用方式:

(1) 修改start_server.bat中的java路径,需要使用jre11及以上版本,start_server.bat内容如下:

set EXECJAVA="D:\Java\jdk-11\bin\java"

%EXECJAVA% -Xms128m -Xmx1024m  -cp .;*;../../lib/*; com.scudata.lib.debezium.DebeziumCollect %1 %2

(2) start_server.bat同级目录下,根据需求新建配置文件mysql.propertiespostgres.properties,配置文件内容如下:

#连接器唯一名称,自定义

name=testName

#连接器类的名称,不同的数据库类名不同,这里以MySQL数据库为例

connector.class=io.debezium.connector.mysql.MySqlConnector

#基于文件的offset持久化

offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore

offset.storage.file.filename=F:/tmp/dbz/storage/mysql_offsets.dat

offset.flush.interval.ms=60000

#数据库服务器名称

database.server.name=myDatabase

#当前数据库的数字ID,默认随机

database.server.id=59059

#数据库IP地址和端口

database.hostname=localhost

database.port=3306

#连接数据库的用户名和密码

database.user=username

database.password=password

#数据库名称

database.dbname =myDBname

#设置时区

database.connectionTimeZone=GMT+8

#监听的数据表列表,以“,”分割。

table.include.list=myDBname.table1,myDBname.table2

#自定义表级别topic的名称

topic.prefix=sync

#复制槽名称

slot.name=slotName

#指定数据库架构历史存储的位置

schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory

schema.history.internal.file.filename=F:/tmp/dbz/storage/schemahistory.dat 

#设置快照方式

snapshot.mode=always

#使用第三方接口处理数据类型 

decimal.handling.mode=double

numeric.handling.mode=numeric

#数据转换,设置时间类型格式

converters=scudataConverter

scudataConverter.type=com.scudata.lib.debezium.ScudataConverter

scudataConverter.schema.name=io.debezium.scudata.type.scudataConverter

scudataConverter.format.date=yyyy-MM-dd

scudataConverter.format.time=HH:mm:ss

scudataConverter.format.datetime=yyyy-MM-dd HH:mm:ss

scudataConverter.format.timestamp=yyyy-MM-dd HH:mm:ss

scudataConverter.format.timestamptz=yyyy-MM-dd HH:mm:ss

scudataConverter.format.timestamp.zone=GMT+8

scudataConverter.format.timestamptz.zone=UTC

(3) 启动MySQLPostgreSQL数据库。

(4) 启动数据同步工具,启动语句及参数解析如下:

start_server.bat 参数1 参数2

参数1:存放同步数据文件的目录,可以是绝对路径或相对路径。

操作数据后,该路径下将自动生成dirout目录存放复组表,缺省为当前目录,且此参数不可单独省略;

参数2:需要连接的.properties配置文件,缺省为mysql.properties

如:start_server.bat F:/tmp/myTest postgres.properties表示使用postgres.properties配置文件,将同步数据生成的文件存放在指定目录F:/tmp/myTest/dirout下。

(5) 启动后,对数据库中的数据进行增删改操作,其数据变化将记录在dirout目录下生成的复组表中:

postgres.employee_2024_12.ctx”记录数据库postgreemployee表在202412月份所有的数据变化,下个月将新建一组复组表进行记录。

5. 读取数据同步后生成的复组表可使用的外部库函数有cdc_collect()cdc_merge()。函数用法请参考【帮助】-【函数参考】。