普元数据开发平台 普元数据开发平台
产品介绍
安装部署
快速入门
操作指南
SDK开发指南
FAQ
  • 通用模板作业
  • 关系型数据库同步(全量)
  • 数据库到Hive同步(全量)
  • 关系型数据库到StarRocks同步(全量)
  • StarRocks到关系型数据库同步(全量)
  • StarRocks到StarRocks同步(全量)
  • StarRocks到Hive同步(全量)
  • StarRocks到MongoDB同步(全量)
  • CDC增量同步(增量)
  • MySQL-CDC 配置
  • PostgreSQL-CDC 配置
  • SQL Server-CDC 配置
  • MongoDB-CDC 配置
  • Oracle-CDC 配置
  • 时间戳关系型数据库增量同步(增量)
  • 时间戳关系型数据库到Hive增量同步(增量)

# 通用模板作业

本章节将详细讲述基于 Seatunnel 引擎的数据同步通用模板的使用。

  • 关系型数据库同步(全量)
  • 关系型数据库到Hive同步(全量)
  • 关系型数据库到StarRocks同步(全量)
  • StarRocks到关系型数据库同步(全量)
  • StarRocks到StarRocks同步(全量)
  • StarRocks到Hive同步(全量)
  • StarRocks到MongoDB同步(全量)
  • CDC增量同步(增量)
  • 时间戳关系型数据库增量同步(增量)
  • 时间戳关系型数据库到Hive增量同步(增量)

全量同步可以实现单表、多表、整库的迁移,将全部数据一次性从源表迁移到目标表。

  • 单表:只选择一张表名,可快速实现两个表之间的数据批量迁移。
  • 多表:选择多张表名,可快速实现多个表之间的数据批量迁移。
  • 整库:全选表名。整库同步是帮助用户提升数据同步效率的工具,通过整库同步可快速实现两个库之间的数据批量迁移,节省任务配置工作,快速提升数据迁移效率。

# 关系型数据库同步(全量)

关系型数据库同步一共有四步:

  • 第一步:创建通用模板作业,选择处理引擎 "Seatunnel",选择"全量同步",选择"数据库同步"模板。
  • 第二步:选择来源库、表名称。
  • 第三步:选择目标库、表名称。
  • 第四步:来源库表名称与目标库表名称的映射。

multi

multi

multi

multi

# 数据库到Hive同步(全量)

数据库到Hive同步:用于数据库的数据迁移到Hive。可以将来源库的多个表或者全部表(整库)的数据迁移到Hive的目标库、表。

Hive库目标表不存在时,会自动创建目标表,可在模型上设置目标表存储格式,支持ORC、PARQUET、TEXTFILE格式;

Hive库目标表存在时,会自动获取表存储格式,支持获取ORC、PARQUET、TEXTFILE格式,其中TEXTFILE格式分隔符支持默认、TAB、逗号。

⚠️ 提示:目标表存在时,要确保源表与目标表的数据结构、字段顺序一致,字段类型相匹配。

multi

multi

# 关系型数据库到StarRocks同步(全量)

关系型数据库到StarRocks同步:用于将关系型数据库的数据迁移到StarRocks。可以将来源库的多个表或者全部表(整库)的数据迁移到 StarRocks 的目标库、表。

# StarRocks到关系型数据库同步(全量)

StarRocks到关系型数据库同步:用于将StarRocks的数据迁移到关系型数据库。可以将 StarRocks 来源库的多个表或者全部表(整库)的数据迁移到关系型数据库的目标库、表。

# StarRocks到StarRocks同步(全量)

StarRocks到StarRocks同步:用于将 StarRocks 的数据迁移到 StarRocks。可以将 StarRocks 来源库的多个表或者全部表(整库)的数据迁移到 StarRocks 的目标库、表。

# StarRocks到Hive同步(全量)

StarRocks到Hive同步:用于将 StarRocks 的数据迁移到 Hive。可以将 StarRocks 来源库的多个表或者全部表(整库)的数据迁移到 Hive 的目标库、表。

# StarRocks到MongoDB同步(全量)

StarRocks到MongoDB同步:用于将 StarRocks 的数据迁移到 MongoDB。可以将 StarRocks 来源库的多个表或者全部表(整库)的数据迁移到 MongoDB 的目标库、表。

# CDC增量同步(增量)

增量同步与多表迁移的向导界面类似,且都支持多表增量同步。与多表迁移的不同点在于,增量同步不是一次性将数据从源表同步到目标表,而是源表的数据发生变化(增加、修改、删除)时才会同步到目标表。

⚠️ 提示:

1、源表和目标表都必须定义主键;

2、目标表不存在时,模型运行时会自动创建表,和源表结构一致;

3、目标表存在时,要确保源表与目标表的数据结构、字段类型一致,且定义主键。同时源表开启了 cdc 配置。

增量同步支持的数据源如下:

数据源类型 版本 驱动
MySQL-CDC MySQL: 5.6,5.7,8.0.x JDBC Driver: 8.0.28
PostgreSQL-CDC PostgresSQL: 10, 11, 12, 13,14 JDBC Driver: 42.5.1
SQLServer-CDC Sqlserver: 2012, 2014, 2016, 2017, 2019 JDBC Driver: 9.4.1.jre8
MongoDB-CDC MongoDB: 3.12.11 JDBC Driver: 3.12.11
Oracle-CDC Oracle: 12c JDBC Driver: ojdbc8

CDC 增量同步作业需要提前对数据库进行配置,具体的配置方法如下:

# MySQL-CDC 配置

1、修改 MySQL 配置,开启 binlog 功能,并设置 binlog 格式为 row 模式。

binlog 是 MySQL 的二进制日志文件,它记录了数据库中的所有变更操作,Mysql CDC Connector 会通过 binlog 来捕获数据的变化。

参考配置如下:

[mysqld]
 
# ----------------------------------------------
# Enable the binlog for replication & CDC
# ----------------------------------------------
 
# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems
server-id         = 223344
log_bin           = mysql-bin
expire_logs_days  = 1
binlog_format     = row
 
# enable gtid mode
gtid_mode = on
enforce_gtid_consistency = on

2、添加权限(如果是非 root 用户)

MySQL Binlog 权限需要三个权限 SELECT, REPLICATION SLAVE, REPLICATION CLIENT:

Select 权限代表允许从表中查看数据。

Replication client 权限代表允许执行show master status,show slave status,show binary logs 命令。

Replication slave 权限代表允许slave主机通过此用户连接master以便建立主从复制关系。

3、执行命令(如果是非 root 用户)

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dws'@'%' IDENTIFIED BY 'dws';

# PostgreSQL-CDC 配置

1、修改 postgresql.conf 配置文件,参考配置如下:

# 更改wal日志方式为logical(方式有:minimal、replica 、logical )
wal_level = logical

# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots 
max_replication_slots = 20
 
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20

# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用) 
wal_sender_timeout = 180s

2、开启权限

-- 更改复制标识包含更新和删除之前值
ALTER TABLE table_name  REPLICA IDENTITY FULL;
-- 查看复制标识(为f标识说明设置成功)
select relreplident from pg_class where relname='table_name';

# SQL Server-CDC 配置

1、开启SQL Server CDC的必要条件

  • 需要开启数据库 CDC
  • 需要开启表 CDC
  • 需要开启代理服务
  • 磁盘要有足够的空间,保存日志文件
  • 表必须要有主键或者是唯一索引

2、参考文档:https://support.huaweicloud.com/intl/zh-cn/usermanual-roma/fdi-ug-202009081.html

# MongoDB-CDC 配置

1、开启Mongodb CDC的必要条件

  • MongoDB version: MongoDB version >= 4.0.
  • Cluster deployment: replica sets or sharded clusters
  • Storage Engine: WiredTiger Storage Engine.
  • Permissions:changeStream and read(执行如下语句)
use admin;
db.createRole(
    {
        role: "strole",
        privileges: [{
            resource: { db: "", collection: "" },
            actions: [
                "splitVector",
                "listDatabases",
                "listCollections",
                "collStats",
                "find",
                "changeStream" ]
        }],
        roles: [
            { role: 'read', db: 'config' }
        ]
    }
);
db.createUser(
  {
      user: 'stuser',
      pwd: 'stpw',
      roles: [
         { role: 'strole', db: 'admin' }
      ]
  }
);

2、配置 MongoDB 副本集 创建 mongo1.conf,参考配置如下:

# 端口默认 
port = 27017
#数据目录
dbpath = /usr/local/Cellar/mongodb-community/7.0.0/data/db
#日志所在目录
logpath = /usr/local/Cellar/mongodb-community/7.0.0/data/logs/mongodb.log
#日志输出方式
logappend = true
# 在后台启动
fork=true
# 0.0.0.0 表示任意IP均可连接
bind_ip=0.0.0.0
#副本集名称
replSet=rs0

3、启动 MongoDB 节点,参考命令如下:

cd /usr/local/Cellar/mongodb-community/7.0.0/bin
启动命令:
mongod -f mongo.conf

4、初始化副本集

# 连接mongodb
./mongo mongodb://127.0.0.1:27017
use admin
# 设置变量
rs0= {
   _id: "rs0",
   members: [
     {
      _id: 0,
      host: "127.0.0.1:27017"
     },
    ]
 }
# 执行初始化
rs.initiate(rs0)

# Oracle-CDC 配置

1、开启Oracle CDC(LogMiner) 在 Seatunnel 中使用 Oracle CDC 同步数据需为 Oracle 数据库开启 LogMiner(Oracle提供的内置工具)模式的 CDC(Change Data Capture) 功能。请遵循以下步骤:

Oracle为非CDB数据库时的配置 创建空文件目录存放Oracle归档日志和用户表空间。

mkdir -p /opt/oracle/oradata/recovery_area
mkdir -p /opt/oracle/oradata/ORCLCDB
chown -R oracle /opt/oracle/***

1)启用日志归档

以管理员身份登录并启用Oracle归档日志。

sqlplus /nolog;
 
connect sys as sysdba;
 
alter system set db_recovery_file_dest_size = 10G;
 
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
 
shutdown immediate;
 
startup mount;
 
alter database archivelog;
 
alter database open;
 
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
 
archive log list;

2)用户赋权

以管理员身份登录,创建一个名为logminer_user的帐户,密码为“oracle”,并授予该帐户读取表和日志的权限。

CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
 
CREATE USER logminer_user IDENTIFIED BY oracle DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs;
 
GRANT CREATE SESSION TO logminer_user;
 
GRANT SELECT ON V_$DATABASE to logminer_user;
 
GRANT SELECT ON V_$LOG TO logminer_user;
 
GRANT SELECT ON V_$LOGFILE TO logminer_user;
 
GRANT SELECT ON V_$LOGMNR_LOGS TO logminer_user;
 
GRANT SELECT ON V_$LOGMNR_CONTENTS TO logminer_user;
 
GRANT SELECT ON V_$ARCHIVED_LOG TO logminer_user;
 
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO logminer_user;
 
GRANT EXECUTE ON DBMS_LOGMNR TO logminer_user;
 
GRANT EXECUTE ON DBMS_LOGMNR_D TO logminer_user;
 
GRANT LOGMINING TO logminer_user;
 
 
GRANT SELECT ANY TABLE TO logminer_user;
 
GRANT ANALYZE ANY TO logminer_user;

2、SeaTunnel Oracle CDC 配置示例

{
  "env" : {
    "execution.parallelism" : 1,
    "job.mode" : "STREAMING",
    "checkpoint.interval" : 5000
  },
  "source" : [
    {
      "base-url" : "jdbc:oracle:thin:@192.168.2.58:1521:orcl",
      "password" : "oracle",
      "debezium" : {
        "database.oracle.jdbc.timezoneAsRegion" : "false"
      },
      "connection.pool.size" : 5,
      "table-names" : [
        "ORCL.lOGMINER_USER.EAST5_409_GRXDFHZ_POC"
      ],
      "result_table_name" : "customers",
      "database-names" : [
        "ORCL"
      ],
      "schema-names" : [
        "lOGMINER_USER"
      ],
      "source.reader.close.timeout" : 120000,
      "plugin_name" : "Oracle-CDC",
      "username" : "logminer_user"
    }
  ],
  "transform" : [],
  "sink" : [
    {
      "password" : "primeton000000",
      "database" : "ORCL",
      "batch_size" : 1,
      "primary_keys" : [
        "ID"
      ],
      "driver" : "oracle.jdbc.driver.OracleDriver",
      "source_table_name" : "customers",
      "generate_sink_sql" : true,
      "plugin_name" : "Jdbc",
      "user" : "C##DSP6GA_M",
      "url" : "jdbc:oracle:thin:@//192.168.16.96:1521/ORCL",
      "table" : "C##DSP6GA_M.EAST5_409_GRXDFHZ_POC"
    }
  ]
}

# 时间戳关系型数据库增量同步(增量)

基于时间戳模型,实现从源表向目标表增量同步数据,要求:

  • 业务系统表中要必须有一个时间戳字段;
  • 业务系统表中需要有可唯一标识某一行的字段作为查询字段。

# 时间戳关系型数据库到Hive增量同步(增量)

时间戳关系型数据库到Hive增量同步和时间戳关系型数据库增量同步的向导页面类似,不同的是目标表是hive类型的表,可设置部分字段映射同步。

Hive库目标表不存在时,会自动创建目标表,可在模型上设置目标表存储格式,支持ORC、PARQUET、TEXTFILE格式;

Hive库目标表存在时,会自动获取表存储格式,支持获取ORC、PARQUET、TEXTFILE格式,其中TEXTFILE格式分隔符支持默认、TAB、逗号。

← (增量)时间戳StarRocks到Hive增量同步 SQL程序 →