# CDC增量同步(增量)
使用示例
# 说明
用于将 关系型数据库的数据迁移到关系型数据库。实时监测源表数据变化,实时同步到目标表中。模型运行成功后会一直处于运行状态。
⚠️ 提示:在作业执行过程中,当目标表不存在时,会自动创建表,然后再将源表数据同步至目标表。
增量同步与多表迁移的向导界面类似,且都支持多表增量同步。与多表迁移的不同点在于,增量同步不是一次性将数据从源表同步到目标表,而是源表的数据发生变化(增加、修改、删除)时才会同步到目标表。
⚠️ 提示:
1、源表和目标表都必须定义主键;
2、目标表不存在时,模型运行时会自动创建表,和源表结构一致;
3、目标表存在时,要确保源表与目标表的数据结构、字段类型一致,且定义主键。同时源表开启了 cdc 配置。
增量同步支持的数据源如下:
数据源类型 | 版本 | 驱动 |
---|---|---|
MySQL-CDC | MySQL: 5.6,5.7,8.0.x | JDBC Driver: 8.0.28 |
PostgreSQL-CDC | PostgresSQL: 10, 11, 12, 13,14 | JDBC Driver: 42.5.1 |
SQLServer-CDC | Sqlserver: 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 9.4.1.jre8 |
MongoDB-CDC | MongoDB: 3.12.11 | JDBC Driver: 3.12.11 |
Oracle-CDC | Oracle: 12c | JDBC Driver: ojdbc8 |
# 配置项说明
配置项 | 是否必填 | 默认值 | 描述 |
---|---|---|---|
选择数据源(来源) | 是 | - | 来源数据的数据源名称,该选项是已经在项目配置中添加过的数据源。 |
选择库(来源) | 是 | - | 来源数据源中的数据库名称。选择来源数据源后自动读取并显示数据库名称,可以直接选择。 |
选择数据源(目标) | 是 | - | 目标数据的数据源名称,该选项是已经在项目配置中添加过的数据源。 |
选择库(目标) | 是 | - | 目标数据源中的数据库名称。选择目标数据源后自动读取并显示数据库名称,可以直接选择。 |
获取表信息 | 否 | - | 如果数据库表发生变化,点击此按钮获取最新的表字段。添加表的弹窗会显示最新的表名称。 |
表名称映射 | 是 | - | 来源表与目标表名称的映射,默认是同名映射(即:目标表与来源表名称一样)。 |
前缀/后缀 | 否 | - | 给目标表名称添加前缀或者后缀,用于批量修改目标表的名称。 |
同步 | 是 | 同步 | 是否执行同步的开关,有两个选项:同步、不同步。 同步:表示执行,来源表数据会同步到目标表; 不同步:表示不执行,来源表数据不会同步到目标表。 |
数据添加模式 | 是 | 同步前删除数据 | 支持三种模式: 1、追加数据:数据统一追加到目标表中; 2、同步前删除数据:先清空目标表数据,然后再给目标表添加数据; 3、当存在数据时,报告错误:当目标表存在相同数据时,报告错误,不会添加数据。如果添加了两张表(或多表),若一张目标表有数据,一张目标表无数据,则运行结果会报告错误,两张目标表都不会添加数据。 提示:全量同步的默认值都是:同步前删除数据。 |
并行度(通用配置中) | 是 | 1 | 配置Source和Sink的并行数。 |
# 数据库CDC配置说明
CDC 增量同步作业需要提前对数据库进行cdc配置,具体的配置方法如下:
# MySQL-CDC 配置
1、修改 MySQL 配置,开启 binlog 功能,并设置 binlog 格式为 row 模式。
binlog 是 MySQL 的二进制日志文件,它记录了数据库中的所有变更操作,Mysql CDC Connector 会通过 binlog 来捕获数据的变化。
参考配置如下:
[mysqld]
# ----------------------------------------------
# Enable the binlog for replication & CDC
# ----------------------------------------------
# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems
server-id = 223344
log_bin = mysql-bin
expire_logs_days = 1
binlog_format = row
# enable gtid mode
gtid_mode = on
enforce_gtid_consistency = on
2、添加权限(如果是非 root 用户)
MySQL Binlog 权限需要三个权限 SELECT, REPLICATION SLAVE, REPLICATION CLIENT:
Select 权限代表允许从表中查看数据。
Replication client 权限代表允许执行show master status,show slave status,show binary logs 命令。
Replication slave 权限代表允许slave主机通过此用户连接master以便建立主从复制关系。
3、执行命令(如果是非 root 用户)
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dws'@'%' IDENTIFIED BY 'dws';
# PostgreSQL-CDC 配置
1、修改 postgresql.conf 配置文件,参考配置如下:
# 更改wal日志方式为logical(方式有:minimal、replica 、logical )
wal_level = logical
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s
2、开启权限
-- 更改复制标识包含更新和删除之前值
ALTER TABLE table_name REPLICA IDENTITY FULL;
-- 查看复制标识(为f标识说明设置成功)
select relreplident from pg_class where relname='table_name';
# SQL Server-CDC 配置
1、开启SQL Server CDC的必要条件
- 需要开启数据库 CDC
- 需要开启表 CDC
- 需要开启代理服务
- 磁盘要有足够的空间,保存日志文件
- 表必须要有主键或者是唯一索引
2、参考文档:https://support.huaweicloud.com/intl/zh-cn/usermanual-roma/fdi-ug-202009081.html
# MongoDB-CDC 配置
1、开启Mongodb CDC的必要条件
- MongoDB version: MongoDB version >= 4.0.
- Cluster deployment: replica sets or sharded clusters
- Storage Engine: WiredTiger Storage Engine.
- Permissions:changeStream and read(执行如下语句)
use admin;
db.createRole(
{
role: "strole",
privileges: [{
resource: { db: "", collection: "" },
actions: [
"splitVector",
"listDatabases",
"listCollections",
"collStats",
"find",
"changeStream" ]
}],
roles: [
{ role: 'read', db: 'config' }
]
}
);
db.createUser(
{
user: 'stuser',
pwd: 'stpw',
roles: [
{ role: 'strole', db: 'admin' }
]
}
);
2、配置 MongoDB 副本集 创建 mongo1.conf,参考配置如下:
# 端口默认
port = 27017
#数据目录
dbpath = /usr/local/Cellar/mongodb-community/7.0.0/data/db
#日志所在目录
logpath = /usr/local/Cellar/mongodb-community/7.0.0/data/logs/mongodb.log
#日志输出方式
logappend = true
# 在后台启动
fork=true
# 0.0.0.0 表示任意IP均可连接
bind_ip=0.0.0.0
#副本集名称
replSet=rs0
3、启动 MongoDB 节点,参考命令如下:
cd /usr/local/Cellar/mongodb-community/7.0.0/bin
启动命令:
mongod -f mongo.conf
4、初始化副本集
# 连接mongodb
./mongo mongodb://127.0.0.1:27017
use admin
# 设置变量
rs0= {
_id: "rs0",
members: [
{
_id: 0,
host: "127.0.0.1:27017"
},
]
}
# 执行初始化
rs.initiate(rs0)
# Oracle-CDC 配置
1、开启Oracle CDC(LogMiner) 在 Seatunnel 中使用 Oracle CDC 同步数据需为 Oracle 数据库开启 LogMiner(Oracle提供的内置工具)模式的 CDC(Change Data Capture) 功能。请遵循以下步骤:
Oracle为非CDB数据库时的配置 创建空文件目录存放Oracle归档日志和用户表空间。
mkdir -p /opt/oracle/oradata/recovery_area
mkdir -p /opt/oracle/oradata/ORCLCDB
chown -R oracle /opt/oracle/***
1)启用日志归档
以管理员身份登录并启用Oracle归档日志。
sqlplus /nolog;
connect sys as sysdba;
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
archive log list;
2)用户赋权
以管理员身份登录,创建一个名为logminer_user的帐户,密码为“oracle”,并授予该帐户读取表和日志的权限。
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER logminer_user IDENTIFIED BY oracle DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs;
GRANT CREATE SESSION TO logminer_user;
GRANT SELECT ON V_$DATABASE to logminer_user;
GRANT SELECT ON V_$LOG TO logminer_user;
GRANT SELECT ON V_$LOGFILE TO logminer_user;
GRANT SELECT ON V_$LOGMNR_LOGS TO logminer_user;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO logminer_user;
GRANT SELECT ON V_$ARCHIVED_LOG TO logminer_user;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO logminer_user;
GRANT EXECUTE ON DBMS_LOGMNR TO logminer_user;
GRANT EXECUTE ON DBMS_LOGMNR_D TO logminer_user;
GRANT LOGMINING TO logminer_user;
GRANT SELECT ANY TABLE TO logminer_user;
GRANT ANALYZE ANY TO logminer_user;
2、SeaTunnel Oracle CDC 配置示例
{
"env" : {
"execution.parallelism" : 1,
"job.mode" : "STREAMING",
"checkpoint.interval" : 5000
},
"source" : [
{
"base-url" : "jdbc:oracle:thin:@192.168.2.58:1521:orcl",
"password" : "oracle",
"debezium" : {
"database.oracle.jdbc.timezoneAsRegion" : "false"
},
"connection.pool.size" : 5,
"table-names" : [
"ORCL.lOGMINER_USER.EAST5_409_GRXDFHZ_POC"
],
"result_table_name" : "customers",
"database-names" : [
"ORCL"
],
"schema-names" : [
"lOGMINER_USER"
],
"source.reader.close.timeout" : 120000,
"plugin_name" : "Oracle-CDC",
"username" : "logminer_user"
}
],
"transform" : [],
"sink" : [
{
"password" : "primeton000000",
"database" : "ORCL",
"batch_size" : 1,
"primary_keys" : [
"ID"
],
"driver" : "oracle.jdbc.driver.OracleDriver",
"source_table_name" : "customers",
"generate_sink_sql" : true,
"plugin_name" : "Jdbc",
"user" : "C##DSP6GA_M",
"url" : "jdbc:oracle:thin:@//192.168.16.96:1521/ORCL",
"table" : "C##DSP6GA_M.EAST5_409_GRXDFHZ_POC"
}
]
}
# FAQ
无