# CDC连接器组件使用说明
# 组件说明
CDC Source连接器,用于捕获数据库的全量快照 + 实时增量变更(INSERT/UPDATE/DELETE),支持断点续传、无锁快照、Exactly-Once 语义。
其核心价值在于:用一套配置解决“多表实时同步+自动路由+断点续传”问题,并且任务一旦启动就是常驻服务。
它是一种流式数据集成模式。任务以job.mode = "STREAMING"运行,先通过全量快照同步历史数据,再无缝切换到增量监听(Binlog、WAL、CDC表),实时捕获INSERT/UPDATE/DELETE并同步下游。
# 支持的数据库与版本
- MySQL(5.7+/8.0+,基于 Binlog)
- PostgreSQL(10+,基于 WAL)
- Oracle(11g+,基于LogMiner/XStream)
- SQL Server(2016+,基于 CDC 表)
- MongoDB(3.6+,基于 Change Streams)
- TiDB(8.0.20,基于 TiCDC)
# 事件类型
SeaTunnel 将 CDC 变更事件抽象为4种RowKind:
- INSERT(+I):新增
- UPDATE_BEFORE(-U):更新前镜像(部分场景可省略)
- UPDATE_AFTER(+U):更新后镜像
- DELETE(-D):删除
# 配置项说明
| 配置名称 | 数据类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
| 节点名称 | String | 是 | CDC | 当前创建的节点名称,由用户自定义且不可为空。命名可包含字母、数字、下划线。确保唯一性。 |
| 节点编码 | String | 是 | 自动生成 | 当前创建的节点编码,以此标识此组件,由用户自定义且不可为空。命名可包含字母、数字、下划线。确保唯一性。 |
| 选择数据源 | String | 是 | - | 从下拉选项中选择列出的当前项目已经关联的数据源。 |
| 选择同步数据库 | List | 否 | - | 从下拉列表中选择数据库,可以多选。 |
| 选择同步数据表 | List | 否 | - | 从下拉列表中选择数据表,可以多选。 |
| serverid | String | 否 | - | 数据库客户端的数字ID或数字ID范围,语法类似“5400”,数字ID范围语法类似“5400-5408”。在数据库集群中所有当前运行的数据库进程中,每个ID都必须是唯一的。 默认生成的随机数介于5400和6400之间。 |
| 启动模式 | String | 否 | initial | 有五种:initial、earliest、latest、specific、timestamp。 initial:启动时先同步历史数据,再同步增量数据。 earliest:从最开始的偏移量同步。 latest:从最后的偏移量同步。 specific:从用户提供的特定偏移量同步。 timestamp:从某一时间戳同步数据。 |
| 时间戳 | String | 是 | - | 当“启动模式“选择“timestamp”时,需要指定时间戳,单位为:毫秒。 |
| 增量读取阶段并行度 | String | 否 | - | 增量阶段中的并行读卡器的数量。 |
| 数据输出格式 | Enum | 否 | Default | CDC 的可选输出格式,有效的枚举是Default、Compatible Debezium Json。 |
| Debezium相关的配置参数 | Config | 否 | - | 将 Debezium 的属性传递到 Debezium 嵌入式引擎,该引擎用于从数据库服务器捕获数据更改。用户可以根据需求进行配置。 |
| 启用精确一次语义 | Boolean | 否 | false | Boolean 类型:是、否。是否启用一次语义。 |
| 可选参数 | - | 否 | - | 其他参数,用户可以根据需求进行配置。 |
# 关键特性与注意事项
- CDC 必须用 STREAMING 模式
- 断点续传:Checkpoint 记录位点,重启不丢数据
- 无锁快照:snapshot.locking.mode = none 降低源库影响
- Exactly-Once:依赖 Checkpoint 与分布式快照
- 主键要求:必须有主键,否则无法处理 UPDATE/DELETE
- 动态加表:运行中可新增表,无需重启
- 数据格式:输出含 op(c/u/d)、before、after 字段
# 功能限制
CDC模式下,Source 端不支持 query 或自定义 SELECT 语句。原因是 CDC 读取的是 Binlog 事件流,而非 SQL 结果集。
不支持无主键表
CDC同步要求表必须有主键。若无主键,更新/删除时无法定位具体行,导致下游数据错乱。
Source层不支持自定义SQL过滤
CDC连接器读取的是Binlog事件流,无法通过query参数写WHERE条件。
# 性能调优
- 限速:read_limit.bytes_per_second 防止拉爆源库
- Checkpoint:CDC 模式建议间隔 10~30 秒,兼顾恢复速度与性能
- 并行度:增量阶段不建议超过 server-id 范围大小
# 运维与常见问题
任务不会自动停止
CDC任务是常驻服务。启动后会一直监听变更,直到手动停止(Ctrl+C或调度平台kill)。
断点续传与Exactly-Once
CDC连接器集成SeaTunnel的Checkpoint机制,会定期持久化当前读取位点(Binlog位置/LSN)。任务重启时从上次记录点恢复,保证数据不丢不重。
Sink端CDC写入语义
以ClickHouse为例,CDC写入需注意:
- 主键必须存在:用于识别UPDATE/DELETE。
- UPSERT模式:可开启,但会降低吞吐(先查询是否存在)。
- 顺序保证:框架通过缓冲区策略保证同一主键的变更按事件顺序执行。
SQL Server专用调优
- 清理期调整:sys.sp_cdc_change_job @job_type='cleanup', @retention=43200(30天)。
- 性能:CDC通过日志读取,对源库压力远低于轮询。
# CDC vs JDBC
| 场景 | 推荐模式 | 原因 |
|---|---|---|
| 实时同步业务库到数仓 | CDC(Streaming) | 低延迟,持续监听 |
| T+1报表、历史数据补录 | JDBC(Batch) | 一次性任务,自动结束 |
| 整库迁移 | CDC(多表+变量) | 一个任务同步上百张表 |
| 源表无主键 | 不适用CDC | CDC要求表有主键(识别变更) |
# MySQL CDC 故障快速排查
| 现象 | 常见原因 | 检查项 |
|---|---|---|
| 启动报错无权限 | 用户缺少REPLICATION权限 | SHOW GRANTS |
| 连接超时 | 分片过大/网络抖动 | 增大 connect.timeout.ms |
| 同步丢数据 | server-id 冲突 | 确保每个任务 server-id 全局唯一 |
| 时区差8小时 | 未指定server-time-zone | 强制设为Asia/Shanghai |
| 增量不启动 | Binlog位置失效 | 检查startup.mode是否误设为latest |
# 开启 CDC 说明
增量同步支持的数据源如下:
| 数据源类型 | 版本 | 驱动 |
|---|---|---|
| MySQL-CDC | MySQL: 5.6,5.7,8.0.x | JDBC Driver: 8.0.28 |
| PostgreSQL-CDC | PostgresSQL: 10, 11, 12, 13,14 | JDBC Driver: 42.5.1 |
| SQLServer-CDC | Sqlserver: 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 9.4.1.jre8 |
| MongoDB-CDC | MongoDB: 3.12.11 | JDBC Driver: 3.12.11 |
| Oracle-CDC | Oracle: 12c, 19c | JDBC Driver: 19.3.0.0 |
| TiDB-CDC | TiDB: 8.0.20 | JDBC Driver: 8.0.20 |
CDC 增量同步作业需要提前对数据库进行配置,具体的配置方法如下:
# MySQL-CDC 配置
1、修改 MySQL 配置,开启 binlog 功能,并设置 binlog 格式为 row 模式。
binlog 是 MySQL 的二进制日志文件,它记录了数据库中的所有变更操作,Mysql CDC Connector 会通过 binlog 来捕获数据的变化。
参考配置如下:
[mysqld]
# ----------------------------------------------
# Enable the binlog for replication & CDC
# ----------------------------------------------
# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems
server-id = 223344
log_bin = mysql-bin
expire_logs_days = 1
binlog_format = row
# enable gtid mode
gtid_mode = on
enforce_gtid_consistency = on
2、添加权限(如果是非 root 用户)
MySQL Binlog 权限需要三个权限 SELECT, REPLICATION SLAVE, REPLICATION CLIENT:
Select 权限代表允许从表中查看数据。
Replication client 权限代表允许执行show master status,show slave status,show binary logs 命令。
Replication slave 权限代表允许slave主机通过此用户连接master以便建立主从复制关系。
3、执行命令(如果是非 root 用户)
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dws'@'%' IDENTIFIED BY 'dws';
# PostgreSQL-CDC 配置
1、修改 postgresql.conf 配置文件,参考配置如下:
# 更改wal日志方式为logical(方式有:minimal、replica 、logical )
wal_level = logical
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s
2、开启权限
-- 更改复制标识包含更新和删除之前值
ALTER TABLE table_name REPLICA IDENTITY FULL;
-- 查看复制标识(为f标识说明设置成功)
select relreplident from pg_class where relname='table_name';
# SQL Server-CDC 配置
1、开启SQL Server CDC的必要条件
- 需要开启数据库 CDC
- 需要开启表 CDC
- 需要开启代理服务
- 磁盘要有足够的空间,保存日志文件
- 表必须要有主键或者是唯一索引
2、参考文档:https://support.huaweicloud.com/intl/zh-cn/usermanual-roma/fdi-ug-202009081.html
oracle
# MongoDB-CDC 配置
1、开启Mongodb CDC的必要条件
- MongoDB version: MongoDB version >= 4.0.
- Cluster deployment: replica sets or sharded clusters
- Storage Engine: WiredTiger Storage Engine.
- Permissions:changeStream and read(执行如下语句)
use admin;
db.createRole(
{
role: "strole",
privileges: [{
resource: { db: "", collection: "" },
actions: [
"splitVector",
"listDatabases",
"listCollections",
"collStats",
"find",
"changeStream" ]
}],
roles: [
{ role: 'read', db: 'config' }
]
}
);
db.createUser(
{
user: 'stuser',
pwd: 'stpw',
roles: [
{ role: 'strole', db: 'admin' }
]
}
);
2、配置 MongoDB 副本集 创建 mongo1.conf,参考配置如下:
# 端口默认
port = 27017
#数据目录
dbpath = /usr/local/Cellar/mongodb-community/7.0.0/data/db
#日志所在目录
logpath = /usr/local/Cellar/mongodb-community/7.0.0/data/logs/mongodb.log
#日志输出方式
logappend = true
# 在后台启动
fork=true
# 0.0.0.0 表示任意IP均可连接
bind_ip=0.0.0.0
#副本集名称
replSet=rs0
3、启动 MongoDB 节点,参考命令如下:
cd /usr/local/Cellar/mongodb-community/7.0.0/bin
启动命令:
mongod -f mongo.conf
4、初始化副本集
# 连接mongodb
./mongo mongodb://127.0.0.1:27017
use admin
# 设置变量
rs0= {
_id: "rs0",
members: [
{
_id: 0,
host: "127.0.0.1:27017"
},
]
}
# 执行初始化
rs.initiate(rs0)
# Oracle-CDC 配置
1、创建空文件目录存放Oracle归档日志和用户表空间。
mkdir -p /opt/oracle/oradata/recovery_area
mkdir -p /opt/oracle/oradata/ORCLCDB
chown -R oracle /opt/oracle/***
2、启用日志归档
以管理员身份登录并启用Oracle归档日志。
sqlplus /nolog;
connect sys as sysdba;
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
archive log list;
3、用户赋权
以管理员身份登录,创建一个名为logminer_user的帐户,密码为“oracle”,并授予该帐户读取表和日志的权限。
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER logminer_user IDENTIFIED BY oracle DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs;
GRANT CREATE SESSION TO logminer_user;
GRANT SELECT ON V_$DATABASE to logminer_user;
GRANT SELECT ON V_$LOG TO logminer_user;
GRANT SELECT ON V_$LOGFILE TO logminer_user;
GRANT SELECT ON V_$LOGMNR_LOGS TO logminer_user;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO logminer_user;
GRANT SELECT ON V_$ARCHIVED_LOG TO logminer_user;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO logminer_user;
GRANT EXECUTE ON DBMS_LOGMNR TO logminer_user;
GRANT EXECUTE ON DBMS_LOGMNR_D TO logminer_user;
GRANT LOGMINING TO logminer_user;
GRANT SELECT ANY TABLE TO logminer_user;
GRANT ANALYZE ANY TO logminer_user;
# FAQ
常见问题
- 无主键表:不支持,需添加主键或唯一索引
- 快照慢:调大 chunk.size(默认 8096)、fetch.size
- 位点丢失:确保 Checkpoint 正常、目录可写
- 权限不足:源库需 RELOAD、REPLICATION SLAVE 等权限