# PDI-时间戳关系型数据库到Hive增量同步示例
本示例主要介绍Primeton DI处理引擎下,使用时间戳关系型数据库到Hive增量同步进行多表数据迁移的增量同步场景。
时间戳增量同步:实现原理是从目标表获取最大时间,查出源表中大于这个最大时间的所有数据,从而在目标表进行 Insert 操作。
主要步骤如下:
# 准备初始数据
以 MySQL 数据库为例。创建数据库:dws_source_dev,在数据库 dws_source_dev 中创建一个表 orderinfo。
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for t_xueji -- ---------------------------- drop table if EXISTS orderinfo; create table orderinfo ( ordercode VARCHAR(32) not null comment '订单号', amount int(11) comment '数量', price VARCHAR(32) comment '价格', manufacturer VARCHAR(32) comment '厂商', createtime DATE comment '创建时间', createuser VARCHAR(32) comment '创建人', updatetime timestamp comment '更新时间' ) ENGINE=INNODB DEFAULT CHARSET=utf8; alter table orderinfo add constraint ORDER_PRIMARYKEY primary key (ORDERCODE); -- ---------------------------- -- Records of t_xueji -- ---------------------------- BEGIN; insert into orderinfo (ordercode, amount, price, manufacturer, createtime, createuser, updatetime) values ('001', 1000, '5000', 'xingyun', '2018-12-13', 'zhangcong', '2018-12-13 00:00:00'); insert into orderinfo (ordercode, amount, price, manufacturer, createtime, createuser, updatetime) values ('002', 500, '1000', '西安金属', '2018-12-13', '王强', '2018-12-15 00:00:00'); insert into orderinfo (ordercode, amount, price, manufacturer, createtime, createuser, updatetime) values ('003', 1500, '7000', '枫禾林木', '2018-12-13', '魏雨', '2018-12-13 12:00:00'); insert into orderinfo (ordercode, amount, price, manufacturer, createtime, createuser, updatetime) values ('004', 20000, '75000', '星峰', '2018-12-13', '王林', '2018-12-13 00:00:00'); insert into orderinfo (ordercode, amount, price, manufacturer, createtime, createuser, updatetime) values ('005', 1300, '8000', '天威科技', '2018-12-13', '王林', '2019-12-01 00:00:00'); COMMIT; SET FOREIGN_KEY_CHECKS = 1;
创建hive数据库:pdi_demo。
CREATE DATABASE pdi_demo;
# 新建通用模板作业
点击资源树节点上的【...】,选择弹出菜单【新建通用模板作业】,填写"模板名称",选择"模板类型",点击【确定】按钮。
# 配置表名映射
- 选择来源和目标表的"数据源"、"库名称"。
⚠️ 提示:单表迁移时选择一张表,多表迁移时选择多张表,整库迁移时选择全部表。
- 在【源表名称】下orderinfo表,点击【手动映射】按钮。
在弹出的【手动映射】框中,点击“同名映射”按钮会生成目标表名称。
当目标表存在时,存储类型不可选,会读取库中目标表的类型;
当目标表不存在时,模型运行后会创建目标表,可设置存储类型,默认存储类型为ORC,可选类型:ORC、PARQUET、TEXTFILE。
⚠️ 提示:默认来源表与目标表的表名称一样,用户可以修改"目标表名称"。
点击确定后,在表名称映射列表中可生成映射信息,可设置字段分区及分割文件。
字段分区:设置该值,可生成hive分区表,可设置多个分区,分区字段有先后顺序。
分割文件:目标表以文件形式存储数据,设置该值可指定每个文件中包含的行数。
点击下一步,使用【自动同名映射】设置字段映射,当字段名不匹配时,可使用手动映射建立映射关系;在“源表字段”和“目标表字段”选择时间戳字段作为查询条件。
# 通用配置
在通用配置中可以配置任务优先级、Worker 分组、命名参数、本地参数、超时告警。 修改属性后请务必点击【确定】按钮。
可以参考示例关系型表数据同步示例 中的"通用配置"说明。
# 指标日志
在指标日志中可以设置是否启用指标日志,当开启后,可以在【作业调度-【任务实例】中使用“查看数据量”统计数据量。
# 保存草稿
如果所有组件属性都已设置完毕,点击【保存】按钮,可以看到保存过的历史草稿,并可以随意切换草稿。(草稿只保存最近 10 个)
可以参考示例关系型表数据同步示例 中的"保存草稿"说明。
# 运行
点击【运行】按钮,可以运行已经开发完毕的场景,在日志栏可以看运行日志及运行结果。
上述模型中目标表不存在,故第一次运行时会自动创建目标表,且会把源表中数据全部同步。
在MySQL数据库的dws_source_dev
库中再插入以下数据:
insert into orderinfo (ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
values ('006', 2000, '5000', '微云科', '2018-12-13', '王林', '2019-12-01 00:20:11');
insert into orderinfo (ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
values ('007', 3000, '9000', '西安金属', '2018-12-13', '王强', '2022-12-13 00:00:00');
insert into orderinfo (ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
values ('008', 2300, '11589', '星峰', '2018-12-13', '王强', '2020-01-01 00:00:00');
insert into orderinfo (ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
values ('009', 5100, '12909', '星云制造', '2018-12-13', '魏雨', '2018-12-13 00:00:00');
insert into orderinfo (ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
values ('010', 1800, '5900', '天威科技', '2018-12-13', '魏雨', '2018-12-13 00:00:00');
insert into orderinfo (ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
values ('011', 1800, '5500', '微云科技zz', '2023-01-01', '魏雨', '2023-01-04 10:11:22');
再次运行模型成功,查看目标表中只会同步 updatetime> '2019-12-01 00:00:00'的数据。同理,当下次再运行模型时,只会同步 updatetime> '2023-01-04 10:11:22'即大于最大时间戳的数据。
# 提交版本
当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。
提交后的版本,可以在作业调度中进行"定时"调度配置。
可以参考示例关系型表数据同步示例 中的"提交版本"说明。