# Seatunnel-starRocks到关系型数据库同步示例
本示例主要介绍Seatunnel处理引擎下,使用starRocks到关系型数据库进行单表迁移、多表数据迁移、整库数据迁移的全量同步场景。
单表迁移:将关系型数据源的一张表的数据迁移到另一个数据源。
多表迁移:将关系型数据源的多张表的数据迁移到另一个数据源。
整库迁移:将关系型数据源的所有表的数据迁移到另一个数据源。
主要步骤如下:
# 准备数据
StarRocks 数据库为例。创建两个数据库:dws_source,在数据库 dws_source中创建一个表orderinfo。
drop table if exists `orderinfo`;
CREATE TABLE `orderinfo` (
`ordercode` varchar(32) NOT NULL COMMENT "订单号",
`amount` int(11) NULL COMMENT "数量",
`price` varchar(32) NULL COMMENT "价格",
`manufacturer2` varchar(100) NULL COMMENT "厂商",
`createtime` date NULL COMMENT "创建时间",
`createuser` varchar(32) NULL COMMENT "创建人",
`updatetime` datetime NULL COMMENT "更新时间"
) ENGINE=OLAP
PRIMARY KEY(`ordercode`)
DISTRIBUTED BY HASH(`ordercode`);
drop table if EXISTS userinfo;
create table userinfo
(
usercode VARCHAR(64) not null comment '用户编码',
username VARCHAR(64) comment '用户姓名',
sex VARCHAR(64) comment '性别',
age int(11) comment '年龄',
unitcode VARCHAR(64) comment '所属单位',
deptcode VARCHAR(64) comment '所属部门',
telephone VARCHAR(64) comment '联系方式',
createtime DATE comment '创建时间',
updatetime DATE comment '更新时间'
)ENGINE=OLAP
UNIQUE KEY(`usercode`)
DISTRIBUTED BY HASH(`usercode`);
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('001', 1000, '5000', '星云制造', '2023-10-25', '张无忌4', '2024-01-04 12:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('002', 50022, '1000', '西安金属', '2018-12-13', '张无忌4', '2018-12-13 00:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('003', 1500, '7000', '枫禾林木', '2018-12-13', '魏雨', '2024-01-11 00:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('004', 1000, '75000', '星峰', '2019-01-13', '王林', '2019-01-13 12:23:01');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('005', 1300, '8000', '天威科技', '2018-12-13', '王林', '2018-12-13 00:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('006', 2000, '5000', '微云科', '2023-10-25', '王林', '2023-10-25 00:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('007', 3000, '9001', '西安金属', '2024-12-13', '王强', '2020-12-13 11:11:11');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('008', 12300, '11589', '星峰', '2018-12-13', '王强', '2024-02-04 12:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('009', 5100, '22909', '星云制造', '2018-12-13', '魏雨', '2018-12-13 23:59:59');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('010', 1800, '15900', '天威科技', '2018-12-13', '魏雨', '2018-12-13 00:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('011', 1800, '5500', '微云科技zz', '2023-03-01', '魏雨', '2023-03-01 00:00:00');
INSERT INTO userinfo
(usercode, createtime, unitcode, sex, deptcode, telephone, updatetime, age, username)
VALUES('10', '2018-12-13', '普元信息', '男', '技术部', '15901214578', '2023-01-23', 25, '王林');
INSERT INTO userinfo
(usercode, createtime, unitcode, sex, deptcode, telephone, updatetime, age, username)
VALUES('9', '2018-12-13', '普元信息', '男', '技术部', '15998760132', '2018-12-13', 29, '李建国');
INSERT INTO userinfo
(usercode, createtime, unitcode, sex, deptcode, telephone, updatetime, age, username)
VALUES('1', '2018-12-13', '普元信息', '男', '技术部', '18091256632', '2018-12-13', 30, '张聪');
INSERT INTO userinfo
(usercode, createtime, unitcode, sex, deptcode, telephone, updatetime, age, username)
VALUES('8', '2018-12-13', '普元信息', '男', '事业部', '18790123567', '2019-12-13', 30, '杨孟');
INSERT INTO userinfo
(usercode, createtime, unitcode, sex, deptcode, telephone, updatetime, age, username)
VALUES('4', '2018-12-13', '普元信息', '男', '事业部', '15091345789', '2018-12-13', 25, '刘飞');
INSERT INTO userinfo
(usercode, createtime, unitcode, sex, deptcode, telephone, updatetime, age, username)
VALUES('5', '2018-12-13', '普元信息', '女', '售后', '13090012019', '2018-12-13', 25, '许馨');
INSERT INTO userinfo
(usercode, createtime, unitcode, sex, deptcode, telephone, updatetime, age, username)
VALUES('6', '2018-12-13', '普元信息', '男', '技术部', '19081234789', '2018-12-13', 24, '蔡亮');
INSERT INTO userinfo
(usercode, createtime, unitcode, sex, deptcode, telephone, updatetime, age, username)
VALUES('2', '2018-12-13', '普元信息', '男', '售后', '15389012334', '2018-12-13', 25, '王强');
INSERT INTO userinfo
(usercode, createtime, unitcode, sex, deptcode, telephone, updatetime, age, username)
VALUES('3', '2018-12-13', '普元信息', '女', '售后', '15309872351', '2018-12-13', 26, '魏雨');
INSERT INTO userinfo
(usercode, createtime, unitcode, sex, deptcode, telephone, updatetime, age, username)
VALUES('7', '2018-12-13', '普元信息', '男', '事业部', '19087652123', '2018-12-13', 27, '张强');
在PostgreSQL数据库中创建目标schema及目标表
create schema dws_front;
CREATE TABLE dws_front.userinfo (
usercode varchar(192) NOT NULL,
username varchar(192) NULL,
sex varchar(192) NULL,
age int4 NULL,
unitcode varchar(192) NULL,
deptcode varchar(192) NULL,
telephone varchar(192) NULL,
createtime timestamp NULL,
updatetime timestamp NULL,
CONSTRAINT userinfo_pkey PRIMARY KEY (usercode)
);
# 新建通用模板作业
点击资源树节点上的【...】,选择弹出菜单【新建通用模板作业】,填写"模板名称",选择"模板类型",点击【确定】按钮。
# 配置多表迁移属性
第一步,选择来源的"数据源名称"、"数据库名称"、"表",点击【下一步】按钮。
⚠️ 提示:单表迁移时选择一张表,多表迁移时选择多张表,整库迁移时选择全部表。
第二步,选择目标的"数据源名称"、"数据库名称",点击【下一步】按钮。
数据添加模式:当目标表中有数据时,有三种处理方式:
- APPEND_DATA:追加数据
- DROP_DATA:同步前删除数据
- ERROR_WHEN_DATA_EXISTS:数据存在时报告错误
第三步,将源表与目标表的名称进行映射,点击【确定并保存】按钮。
⚠️ 提示:默认来源表与目标表的表名称一样,用户可以修改"目标表名称"。
# 通用配置
在通用配置中可以配置任务优先级、Worker 分组、命名参数、本地参数、超时告警。 修改属性后请务必点击【确定】按钮。
可以参考示例关系型表数据同步示例 中的"通用配置"说明。
# 保存草稿
如果所有组件属性都已设置完毕,点击【保存】按钮,可以看到保存过的历史草稿,并可以随意切换草稿。(草稿只保存最近 10 个)
可以参考示例关系型表数据同步示例 中的"保存草稿"说明。
# 运行
点击【运行】按钮,可以运行已经开发完毕的场景,在日志栏可以看运行日志及运行结果。
可以在目标数据库表中看到已经迁移过去的数据库表orderinfo、userinfo及数据。
⚠️ 提示:目标表不存在时会自动创建。
# 提交版本
当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。
提交后的版本,可以在作业调度中进行"定时"调度配置。
可以参考示例关系型表数据同步示例 中的"提交版本"说明。