普元数据开发平台 普元数据开发平台
产品介绍
安装部署
快速入门
操作指南
应用集成
运维指南
  • Doris多表同步示例
  • 准备数据
  • 新建同步作业
  • 拖拽图元
  • 配置组件属性
  • 运行
  • 查看数据
  • 提交版本

# Doris多表同步示例

本示例主要演示从Doris源库中读取两张表userinfo、orderinfo的数据,通过转换组件DefineSinkType组件重新定义sink字段存储类型,将数据写入Doris目标库表中。

注意:
DefineSinkType组件用于重新定义sink字段存储类型,对于目标端 表添加模式 开启自动建表时有效。

主要步骤如下:

  • 准备数据
  • 新建同步作业
  • 拖拽图元
  • 运行
  • 查看数据
  • 提交版本

# 准备数据

创建数据库dws_source作为源库

CREATE DATABASE IF NOT EXISTS dws_source;

创建表 userinfo 和 userinfo 作为源表,并给源表 INSERT 一些数据。

-- ---------------------------------------
-- Table structure for orderinfo
-- ---------------------------------------
DROP TABLE IF EXISTS dws_source.`orderinfo`;

CREATE TABLE dws_source.`orderinfo` (
  `ordercode` varchar(32) NOT NULL COMMENT "订单号",
  `amount` int(11) NULL COMMENT "数量",
  `price` varchar(32) NULL COMMENT "价格",
  `manufacturer` varchar(100) NULL COMMENT "厂商",
  `createtime` datetime NULL COMMENT "创建时间",
  `createuser` varchar(32) NULL COMMENT "创建人",
  `updatetime` datetime NULL COMMENT "更新时间"
) ENGINE=OLAP 
UNIQUE KEY(`ordercode`)
DISTRIBUTED BY HASH(`ordercode`)
PROPERTIES ('replication_num'='1');

-- ----------------------------
-- Records of orderinfo
-- ----------------------------

INSERT INTO dws_source.orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('001', 1800, '5900', '天威科技11', '2018-12-13 08:00:00', '魏雨', '2018-12-13 08:00:00');
INSERT INTO dws_source.orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('002', 1800, '9000', '天威科技11', '2018-12-13 08:00:00', '魏雨', '2018-12-13 08:00:00');
INSERT INTO dws_source.orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('003', 1800, '5900', '天威科技', '2018-12-13 08:00:00', '魏雨', '2018-12-13 08:00:00');
INSERT INTO dws_source.orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('004', 2000, '5000', '微云科技', '2018-12-13 08:00:00', '王林', '2019-12-01 08:20:11');
INSERT INTO dws_source.orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('005', 3000, '666', '西安金属', '2018-12-13 08:00:00', '王强', '2022-12-13 08:00:00');


-- ---------------------------------------
-- Table structure for userinfo
-- ---------------------------------------
DROP TABLE IF EXISTS dws_source.`userinfo`;
CREATE TABLE dws_source.`userinfo` (
    `usercode` varchar(32) NOT NULL COMMENT '用户编码',
    `username` varchar(20) COMMENT '用户姓名',
    `sex` char(3) COMMENT '性别',
    `age` INT COMMENT '年龄',
    `unitcode` varchar(32) COMMENT '所属单位',
    `deptcode` varchar(32) COMMENT '所属部门',
    `telephone` varchar(32) COMMENT '联系方式',
    `createtime` Date COMMENT '创建时间',
    `updatetime` Date COMMENT '更新时间'
)ENGINE=OLAP 
UNIQUE KEY(`usercode`)
DISTRIBUTED BY HASH(`usercode`)
PROPERTIES ('replication_num'='1');

-- ----------------------------
-- Records of userinfo
-- ----------------------------

INSERT INTO dws_source.userinfo
(usercode, username, sex, age, unitcode, deptcode, telephone, createtime, updatetime)
VALUES('2', '王建国', '男', 20, '普元信息', '售后', '18977889900', '2024-07-03', '2024-07-04');
INSERT INTO dws_source.userinfo
(usercode, username, sex, age, unitcode, deptcode, telephone, createtime, updatetime)
VALUES('5', '刘飞', '男', 25, '普元信息', '事业部', '15091345789', '2018-12-13', '2018-12-13');
INSERT INTO dws_source.userinfo
(usercode, username, sex, age, unitcode, deptcode, telephone, createtime, updatetime)
VALUES('3', '王强', '男', 25, '普元信息', '售后', '15389012334', '2018-12-13', '2018-12-13');
INSERT INTO dws_source.userinfo
(usercode, username, sex, age, unitcode, deptcode, telephone, createtime, updatetime)
VALUES('4', '魏雨', '女', 26, '普元信息', '售后', '15309872351', '2018-12-13', '2018-12-13');
INSERT INTO dws_source.userinfo
(usercode, username, sex, age, unitcode, deptcode, telephone, createtime, updatetime)
VALUES('1', '张聪', '男', 80, '普元信息', '技术部', '18091256632', '2018-12-13', '2018-12-13');

# 新建同步作业

点击数据同步上的【...】,选择弹出菜单【新建数据同步作业】,作业名称为:DorisSource-DefineSinkType-DorisSink-MultiTable。

# 拖拽图元

依次拖拽数据源中的Doris Source组件、转换中的DefineSinkType组件和目标中的Doris Sink组件,依次连线。如下图所示:

doris

# 配置组件属性

1、双击"Doris Source"组件,根据下图所示步骤依次配置。

doris

2、双击"DefineSinkType"组件,根据下图所示步骤依次配置。

DefineSinkType

3、双击"Doris Sink"组件,根据下图所示步骤依次配置。

Doris

4、Ctrl+S保存该模型。

# 运行

点击【运行】按钮,可以运行已经开发完毕的场景,在日志栏可以看运行日志及运行结果。

Doris

# 查看数据

从SQL客户端中查看源库dws_source的orderinfo、userinfo表数据已经同步到目标库dws_sink的orderinfo、userinfo表中。

目标表orderinfo结果数据

doris

目标表userinfo结果数据

doris

# 提交版本

当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。

提交后的版本,可以在作业调度中进行"定时"调度配置。

doris

← Doris同步示例 Hive读取数据示例 →