普元数据开发平台 普元数据开发平台
产品介绍
安装部署
快速入门
操作指南
应用集成
FAQ
  • DB数据写入kafka示例
  • 准备数据
  • 新建转换作业
  • 拖拽图元
  • 配置组件属性
  • 运行
  • 查看数据
  • 提交版本

# DB数据写入kafka示例

本示例主要演示将关系型数据库中数据写入kafka中。

主要步骤如下:

  • 准备数据
  • 新建转换作业
  • 拖拽图元
  • 配置组件属性
  • 运行
  • 查看数据
  • 提交版本

# 准备数据

创建表orderinfo_kafka。

CREATE TABLE `orderinfo` (
  `ordercode` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '订单号',
  `amount` int DEFAULT NULL COMMENT '数量',
  `price` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '价格',
  `manufacturer` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '厂商',
  `createtime` timestamp NULL DEFAULT NULL COMMENT '创建时间',
  `createuser` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '创建人',
  `updatetime` timestamp NULL DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`ordercode`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='订单表';
 

 INSERT INTO `dws_front_dev`.`orderinfo` (`ordercode`, `amount`, `price`, `manufacturer`, `createtime`, `createuser`, `updatetime`) VALUES ('001', 1000, '5000', 'xingyun', '2018-12-12 00:00:00', 'zhangcong', '2018-12-13 00:00:00');
INSERT INTO `dws_front_dev`.`orderinfo` (`ordercode`, `amount`, `price`, `manufacturer`, `createtime`, `createuser`, `updatetime`) VALUES ('003', 1500, '7000', '枫禾林木', '2018-12-12 00:00:00', '魏雨', '2018-12-13 12:00:00');
INSERT INTO `dws_front_dev`.`orderinfo` (`ordercode`, `amount`, `price`, `manufacturer`, `createtime`, `createuser`, `updatetime`) VALUES ('005', 1300, '8000', '天威科技', '2018-12-12 00:00:00', '王林', '2019-12-01 00:00:00');
INSERT INTO `dws_front_dev`.`orderinfo` (`ordercode`, `amount`, `price`, `manufacturer`, `createtime`, `createuser`, `updatetime`) VALUES ('006', 2000, '5000', '微云科', '2018-12-12 00:00:00', '王林2', '2019-12-01 00:20:11');
INSERT INTO `dws_front_dev`.`orderinfo` (`ordercode`, `amount`, `price`, `manufacturer`, `createtime`, `createuser`, `updatetime`) VALUES ('011', 1800, '5500', '00微云科技', '2022-12-31 00:00:00', '魏雨', '2023-01-04 10:11:22');
INSERT INTO `dws_front_dev`.`orderinfo` (`ordercode`, `amount`, `price`, `manufacturer`, `createtime`, `createuser`, `updatetime`) VALUES ('013', 1302, '5302', '天威科技', '2024-05-24 00:00:00', '魏雨', '2024-05-23 16:59:00');
INSERT INTO `dws_front_dev`.`orderinfo` (`ordercode`, `amount`, `price`, `manufacturer`, `createtime`, `createuser`, `updatetime`) VALUES ('014', 1300, '5400', '天威科技', '2024-05-22 00:00:00', '魏雨', '2024-05-23 17:09:00');

# 新建转换作业

点击数据加工上的【...】,选择弹出菜单【新建数据加工】,作业名称为:DB2kafka。

# 拖拽图元

依次拖拽输入中的关系型数据库输入、json输入,输出中的关系型数据库输出,依次连线。如下图所示:

image-20241121113150921

# 配置组件属性

1、双击"关系型数据库输入"组件,根据下图所示步骤依次配置数据源、库、SQL语句。

image-20241121112804898

2、双击"JSON输出"组件,【输出方式】选择输出值,输出值字段命名为jsondata;分割数据行数输入1,点击下一步获取字段,列表中的字段是要输出到kafka中的字段,可删减。 image-20241121133404703

image-20241121113005007

3、双击"kafka输出"组件,选择数据源及主题,点击下一步配置字段信息,获取字段中保留json输出中的字段jsondata。 image-20241121113104362

image-20241121113114621

4、Ctrl+S保存该模型。

# 运行

点击【运行】按钮,可以运行已经开发完毕的场景。

image-20241121134545939

# 查看数据

image-20241121133922075

# 提交版本

当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。

← kafka数据输出到DB示例 RestClient示例 →