普元数据开发平台 普元数据开发平台
产品介绍
安装部署
快速入门
操作指南
应用集成
运维指南
  • 流查询示例
  • 准备数据
  • 新建批量作业
  • 画布中拖入关系型数据库输入图元、文本文件输入图元、流查询图元、文本文件输出图元
  • 配置"关系型数据库输入"组件属性
  • 配置"文本文件输入"组件属性
  • 配置"流查询"组件属性
  • 配置"文本文件输出"组件属性
  • 通用配置
  • 保存草稿
  • 运行
  • 提交版本

# 流查询示例

本示例主要介绍 流查询 的使用方法。

该场景Mysql中存入订单数据,文件中存入订单快递数据,先读取Mysql数据和文件,通过流查询组件将对应的快递数据添加到Mysql的数据后面,最后输出到文本文件中。主要步骤如下:

  • 准备数据
  • 新建批量作业
  • 画布中拖入关系型数据库输入图元、文本文件输入图元、流查询图元、文本文件输出图元
  • 配置"关系型数据库输入"组件属性
  • 配置"文本文件输入"组件属性
  • 配置"流查询"组件属性
  • 配置"文本文件输出"组件属性
  • 通用配置
  • 保存草稿
  • 运行
  • 提交版本

# 准备数据

在Mysql上执行创建订单主表orders_info并插入数据:

-- 订单主表
CREATE TABLE `orders_info` (
    `order_id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '订单ID',
    `order_no` VARCHAR(32) NOT NULL COMMENT '订单编号',
    `user_id` BIGINT(20) NOT NULL COMMENT '用户ID',
    `user_name` VARCHAR(50) NOT NULL COMMENT '用户名',
    `total_amount` DECIMAL(10,2) NOT NULL COMMENT '订单总金额',
    `order_status` TINYINT(4) NOT NULL DEFAULT 0 COMMENT '订单状态:0-待付款 1-已付款 2-已发货 3-已完成 4-已取消',
    `pay_type` TINYINT(4) DEFAULT NULL COMMENT '支付方式:1-微信 2-支付宝 3-银行卡',
    `pay_time` DATETIME DEFAULT NULL COMMENT '支付时间'
    PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单主表';

-- 插入订单主数据
INSERT INTO `orders_info` (`order_no`, `user_id`, `user_name`, `total_amount`, `order_status`, `pay_type`, `pay_time`) VALUES
('ORD202401010001', 1001, '张三', 299.00, 3, 1, '2024-01-01 10:30:00'),
('ORD202401010002', 1002, '李四', 1599.50, 2, 2, '2024-01-01 14:20:00'),
('ORD202401020001', 1003, '王五', 89.90, 1, 3, '2024-01-01 02:20:00'),
('ORD202401020002', 1001, '张三', 459.00, 3, 1, '2024-01-02 11:45:00'),
('ORD202401030001', 1004, '赵六', 2399.00, 0, 3, '2024-01-02 10:15:00');

在本地创建/home/DI/files/order_extend.csv并插入数据:

order_no,receiver_address,receiver_name,receiver_phone,remark,client_source,user_grade,company,delivery_man,tracking_no,invoice_title,invoice_type
ORD202401010001,北京市朝阳区XX街道XX号,张三,13800138001,请尽快发货,急用,APP,VIP,顺丰速运,王师傅,SF1234567890,张三个人,1
ORD202401010002,上海市浦东新区XX路XX号,李四,13800138002,,PC,NORMAL,中通快递,,ZTO9876543210,上海科技有限公司,2
ORD202401020001,广州市天河区XX路XX号,王五,13800138003,放在快递柜,小程序,,,,,,
ORD202401020002,北京市朝阳区XX街道XX号,张三,13800138001,,APP,VIP,京东快递,,JD5555555555,,1
ORD202401030001,深圳市南山区XX科技园XX栋,赵六,13800138004,企业采购,需要合同,PC,ENTERPRISE,,,,深圳科技有限公司,2

文件内容如下:

说明:
Mysql表orders_info中的order_no字段(订单编号)和order_extend.csv中的order_no进行关联。

# 新建批量作业

点击资源树节点上的【...】,选择弹出菜单【新建批量作业】。作业名称为:"StreamingLookup",选择作业类型:"转换"。

# 画布中拖入关系型数据库输入图元、文本文件输入图元、流查询图元、文本文件输出图元

依次拖拽输入中的关系型数据库输入组件和文本文件输入组件、查询中的流查询组件、输出中的文本文件输出组件,依次连线。如下图所示:

# 配置"关系型数据库输入"组件属性

选择Mysql数据源,然后选择刚才执行“订单主表”sql语句的数据库,最后点击获取SQL查询。

在数据库浏览中输入“orders”进行筛选,并选择订单主表,最后点击确定。

关系型数据库输入最终配置如下:

# 配置"文本文件输入"组件属性

选择文本文件数据源,并填写文件路径为:/home/DI/files/order_extend.csv。

设置文件格式为CSV,分隔符为","。

点击获取字段,并点击确定。

# 配置"流查询"组件属性

  • 选择查询节点名称:该组件会将查询节点组件的数据缓存到内存中,当过来数据时去内存中查找数据,所以该节点选择“文本文件输入”。
  • 查询关键字:点击获取字段,原字段是Mysql表中的字段,查询字段为文件中的字段,所以只保留 order_no订单编号字段进行关联即可。
  • 查询返回字段及值:点击获取字段,获取通过查询需要返回的字段(文件中的字段),可以根据需要进行设置,本示例返回了6个字段。

设置查询关键字:

设置查询返回字段及值:

# 配置"文本文件输出"组件属性

选择HDFS数据源,输入文件路径为/bigdata/csv/stream-lookuo-samples,并设置扩展名为csv,最后点击下一步。

设置分隔符为逗号",",点击获取字段,字段中包含Mysql表字段和流查询返回字段。最后点击确定。

# 通用配置

在通用配置中可以配置任务优先级、Worker 分组、命名参数、本地参数、超时告警。

可以参考示例关系型表数据同步示例 中的"通用配置"说明。

# 保存草稿

如果所有组件属性都已设置完毕,点击【保存】按钮,可以看到保存过的历史草稿,并可以随意切换草稿。(草稿只保存最近 10 个)

可以参考示例关系型表数据同步示例 中的"保存草稿"说明。

# 运行

点击【运行】按钮,可以运行已经开发完毕的场景,在日志栏可以看运行日志及运行结果。

查看HDFS系统的/bigdata/csv/stream-lookuo-samples.csv文件,包含Mysql字段和流查询返回的字段。

# 提交版本

当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。

提交后的版本,可以在作业调度中进行"定时"调度配置。

可以参考示例关系型表数据同步示例 中的"提交版本"说明。

← HBase查询示例 Excel输入-读取HDFS文件 →