1. CdcCli外部库提供Cdc外部库函数;外部库文件路径为:[安装目录]\esProc\extlib\CdcCli。
2. CdcCli要求java环境为jre1.7及以上版本。如果使用自定义jre,那么需要在[安装目录]\esProc\bin\config.txt文件中配置java_home,否则此步可以省略。
3. 使用CdcCli外部库,需要搭配数据同步工具。
DebeziumCli目录下([安装目录]\esProc\extlib\DebeziumCli)提供数据同步工具,使用该工具可以通过Debezium将数据库中的数据增删改操作存储到复组表中(目前支持MySQL、PostgreSQL);Debezium支持3.0.0及以上版本。
润乾外部库核心jar为[安装目录]\esProc\extlib\CdcCli \scu-cdc-cli-2.10.jar, [安装目录]\esProc\extlib\DebeziumCli \scu-debezium-2.10.jar。
除外部库核心jar,DebeziumCli目录下还提供以下文件:
antlr4-runtime-4.10.1.jar
connect-api-3.8.0.jar
connect-json-3.8.0.jar
connect-runtime-3.8.0.jar
debezium-api-3.0.0.Final.jar
debezium-connector-binlog-3.0.0.Final.jar
debezium-connector-mysql-3.0.0.Final.jar
debezium-connector-postgres-3.0.0.Final.jar
debezium-core-3.0.0.Final.jar
debezium-ddl-parser-3.0.0.Final.jar
debezium-embedded-3.0.0.Final.jar
debezium-storage-file-3.0.0.Final.jar
debezium-storage-kafka-3.0.0.Final.jar
fastjson-1.2.76.jar
jackson-annotations-2.16.2.jar
jackson-core-2.16.2.jar
jackson-databind-2.16.2.jar
jackson-module-afterburner-2.16.2.jar
kafka-clients-3.8.0.jar
log4j-1.2.17.jar
mysql-binlog-connector-java-0.31.0.jar
mysql-connector-j-9.0.0.jar
postgresql-42.2.14.jar
slf4j-api-1.7.30.jar
slf4j-reload4j-1.7.36.jar
注:以上第三方依赖jar,外部库压缩包中默认已放置,用户可根据实际应用环境灵活使用。
另外存在以下文件:
log4j.properties
start_server.bat
4. 下面介绍数据同步工具的使用方式:
(1) 修改start_server.bat中的java路径,需要使用jre11及以上版本,start_server.bat内容如下:
set EXECJAVA="D:\Java\jdk-11\bin\java"
%EXECJAVA% -Xms128m -Xmx1024m -cp .;*;../../lib/*; com.scudata.lib.debezium.DebeziumCollect %1 %2
(2) 在start_server.bat同级目录下,根据需求新建配置文件mysql.properties或postgres.properties,配置文件内容如下:
#连接器唯一名称,自定义
name=testName
#连接器类的名称,不同的数据库类名不同,这里以MySQL数据库为例
connector.class=io.debezium.connector.mysql.MySqlConnector
#基于文件的offset持久化
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.storage.file.filename=F:/tmp/dbz/storage/mysql_offsets.dat
offset.flush.interval.ms=60000
#数据库服务器名称
database.server.name=myDatabase
#当前数据库的数字ID,默认随机
database.server.id=59059
#数据库IP地址和端口
database.hostname=localhost
database.port=3306
#连接数据库的用户名和密码
database.user=username
database.password=password
#数据库名称
database.dbname =myDBname
#设置时区
database.connectionTimeZone=GMT+8
#监听的数据表列表,以“,”分割。
table.include.list=myDBname.table1,myDBname.table2
#自定义表级别topic的名称
topic.prefix=sync
#复制槽名称
slot.name=slotName
#指定数据库架构历史存储的位置
schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
schema.history.internal.file.filename=F:/tmp/dbz/storage/schemahistory.dat
#设置快照方式
snapshot.mode=always
#使用第三方接口处理数据类型
decimal.handling.mode=double
numeric.handling.mode=numeric
#数据转换,设置时间类型格式
converters=scudataConverter
scudataConverter.type=com.scudata.lib.debezium.ScudataConverter
scudataConverter.schema.name=io.debezium.scudata.type.scudataConverter
scudataConverter.format.date=yyyy-MM-dd
scudataConverter.format.time=HH:mm:ss
scudataConverter.format.datetime=yyyy-MM-dd HH:mm:ss
scudataConverter.format.timestamp=yyyy-MM-dd HH:mm:ss
scudataConverter.format.timestamptz=yyyy-MM-dd HH:mm:ss
scudataConverter.format.timestamp.zone=GMT+8
scudataConverter.format.timestamptz.zone=UTC
(3) 启动MySQL或PostgreSQL数据库。
(4) 启动数据同步工具,启动语句及参数解析如下:
start_server.bat 参数1 参数2
参数1:存放同步数据文件的目录,可以是绝对路径或相对路径。
操作数据后,该路径下将自动生成dirout目录存放复组表,缺省为当前目录,且此参数不可单独省略;
参数2:需要连接的.properties配置文件,缺省为mysql.properties。
如:start_server.bat F:/tmp/myTest postgres.properties表示使用postgres.properties配置文件,将同步数据生成的文件存放在指定目录F:/tmp/myTest/dirout下。
(5) 启动后,对数据库中的数据进行增删改操作,其数据变化将记录在dirout目录下生成的复组表中:
“postgres.employee_2024_12.ctx”记录数据库postgre中employee表在2024年12月份所有的数据变化,下个月将新建一组复组表进行记录。
5. 读取数据同步后生成的复组表可使用的外部库函数有cdc_collect()、cdc_merge()。函数用法请参考【帮助】-【函数参考】。