普元数据开发平台 普元数据开发平台
产品介绍
安装部署
快速入门
操作指南
应用集成
运维指南
  • Mysql多表数据同步到StarRocks
  • 准备数据
  • 新建同步作业
  • 拖拽图元
  • 配置组件属性
  • 运行
  • 查看数据
  • 提交版本

# Mysql多表数据同步到StarRocks

本示例主要演示从MYSQL源库中读取两张表userinfo、orderinfo的数据,通过转换组件Copy组件配置转换规则,将字段复制到新字段,将数据写入StarRocks目标库表中。

主要步骤如下:

  • 准备数据
  • 新建同步作业
  • 拖拽图元
  • 运行
  • 查看数据
  • 提交版本

# 准备数据

创建数据库dws_source作为源库

CREATE DATABASE IF NOT EXISTS dws_source;

创建表 userinfo 和 userinfo 作为源表,并给源表 INSERT 一些数据。

-- ---------------------------------------
-- Table structure for orderinfo
-- ---------------------------------------

DROP TABLE IF EXISTS `dws_source`.`orderinfo`;

CREATE TABLE `dws_source`.`orderinfo` (
  `ordercode` varchar(32) NOT NULL COMMENT '订单号',
  `amount` int DEFAULT NULL COMMENT '数量',
  `price` varchar(32) DEFAULT NULL COMMENT '价格',
  `manufacturer` varchar(32) DEFAULT NULL COMMENT '制造商',
  `createtime` TIMESTAMP  DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `createuser` varchar(32) DEFAULT NULL COMMENT '创建人',
  `updatetime` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`ordercode`)
) COMMENT='订单信息表';

-- ----------------------------
-- Records of orderinfo
-- ----------------------------

INSERT INTO `dws_source`.orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('001', 1800, '5900', '天威科技11', '2018-12-13 08:00:00', '魏雨', '2018-12-13 08:00:00');
INSERT INTO `dws_source`.orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('002', 1800, '9000', '天威科技11', '2018-12-13 08:00:00', '魏雨', '2018-12-13 08:00:00');
INSERT INTO `dws_source`.orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('003', 1800, '5900', '天威科技', '2018-12-13 08:00:00', '魏雨', '2018-12-13 08:00:00');
INSERT INTO `dws_source`.orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('004', 2000, '5000', '微云科技', '2018-12-13 08:00:00', '王林', '2019-12-01 08:20:11');
INSERT INTO `dws_source`.orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('005', 3000, '666', '西安金属', '2018-12-13 08:00:00', '王强', '2022-12-13 08:00:00');


-- ---------------------------------------
-- Table structure for userinfo
-- ---------------------------------------
DROP TABLE IF EXISTS `dws_source`.`userinfo`;

CREATE TABLE `dws_source`.`userinfo` (
    `usercode` varchar(32) NOT NULL COMMENT '用户编码',
    `username` varchar(20) COMMENT '用户姓名',
    `sex` char(3) COMMENT '性别',
    `age` INT COMMENT '年龄',
    `unitcode` varchar(32) COMMENT '所属单位',
    `deptcode` varchar(32) COMMENT '所属部门',
    `telephone` varchar(32) COMMENT '联系方式',
    `createtime` Date COMMENT '创建时间',
    `updatetime` Date COMMENT '更新时间',
    PRIMARY KEY (`usercode`)
)COMMENT='用户信息表';


-- ----------------------------
-- Records of userinfo
-- ----------------------------

INSERT INTO `dws_source`.userinfo
(usercode, username, sex, age, unitcode, deptcode, telephone, createtime, updatetime)
VALUES('2', '王建国', '男', 20, '普元信息', '售后', '18977889900', '2024-07-03', '2024-07-04');
INSERT INTO `dws_source`.userinfo
(usercode, username, sex, age, unitcode, deptcode, telephone, createtime, updatetime)
VALUES('5', '刘飞', '男', 25, '普元信息', '事业部', '15091345789', '2018-12-13', '2018-12-13');
INSERT INTO `dws_source`.userinfo
(usercode, username, sex, age, unitcode, deptcode, telephone, createtime, updatetime)
VALUES('3', '王强', '男', 25, '普元信息', '售后', '15389012334', '2018-12-13', '2018-12-13');
INSERT INTO `dws_source`.userinfo
(usercode, username, sex, age, unitcode, deptcode, telephone, createtime, updatetime)
VALUES('4', '魏雨', '女', 26, '普元信息', '售后', '15309872351', '2018-12-13', '2018-12-13');
INSERT INTO `dws_source`.userinfo
(usercode, username, sex, age, unitcode, deptcode, telephone, createtime, updatetime)
VALUES('1', '张聪', '男', 80, '普元信息', '技术部', '18091256632', '2018-12-13', '2018-12-13');

# 新建同步作业

点击数据同步上的【...】,选择弹出菜单【新建数据同步作业】,作业名称为:JDBCSource-Copy-StarRocksSink-MultiTable。

# 拖拽图元

依次拖拽数据源中的关系型数据库 Source组件、转换中的Copy组件和目标中的StarRocks Sink组件,依次连线。如下图所示:

doris

# 配置组件属性

1、双击"关系型数据库 Source"组件,根据下图所示步骤依次配置。

Jdbc

2、双击"Copy"组件,根据下图所示步骤依次配置。

Copy

3、双击"StarRocks Sink"组件,根据下图所示步骤依次配置。

StarRocks

4、Ctrl+S保存该模型。

# 运行

点击【运行】按钮,可以运行已经开发完毕的场景,在日志栏可以看运行日志及运行结果。

Doris

# 查看数据

从SQL客户端中查看源库dws_source的orderinfo、userinfo表数据已经同步到目标库dws_target的orderinfo、userinfo表中。

目标表orderinfo结果数据

starRocks

目标表userinfo结果数据

starRocks

# 提交版本

当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。

提交后的版本,可以在作业调度中进行"定时"调度配置。

doris

← SQL插入数据 CDC同步关系型数据库示例 →