# DB数据写入kafka示例
本示例主要演示将关系型数据库中数据写入kafka中。
主要步骤如下:
# 准备数据
创建表orderinfo_kafka。
CREATE TABLE `orderinfo` (
`ordercode` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '订单号',
`amount` int DEFAULT NULL COMMENT '数量',
`price` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '价格',
`manufacturer` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '厂商',
`createtime` timestamp NULL DEFAULT NULL COMMENT '创建时间',
`createuser` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '创建人',
`updatetime` timestamp NULL DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`ordercode`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='订单表';
INSERT INTO `dws_front_dev`.`orderinfo` (`ordercode`, `amount`, `price`, `manufacturer`, `createtime`, `createuser`, `updatetime`) VALUES ('001', 1000, '5000', 'xingyun', '2018-12-12 00:00:00', 'zhangcong', '2018-12-13 00:00:00');
INSERT INTO `dws_front_dev`.`orderinfo` (`ordercode`, `amount`, `price`, `manufacturer`, `createtime`, `createuser`, `updatetime`) VALUES ('003', 1500, '7000', '枫禾林木', '2018-12-12 00:00:00', '魏雨', '2018-12-13 12:00:00');
INSERT INTO `dws_front_dev`.`orderinfo` (`ordercode`, `amount`, `price`, `manufacturer`, `createtime`, `createuser`, `updatetime`) VALUES ('005', 1300, '8000', '天威科技', '2018-12-12 00:00:00', '王林', '2019-12-01 00:00:00');
INSERT INTO `dws_front_dev`.`orderinfo` (`ordercode`, `amount`, `price`, `manufacturer`, `createtime`, `createuser`, `updatetime`) VALUES ('006', 2000, '5000', '微云科', '2018-12-12 00:00:00', '王林2', '2019-12-01 00:20:11');
INSERT INTO `dws_front_dev`.`orderinfo` (`ordercode`, `amount`, `price`, `manufacturer`, `createtime`, `createuser`, `updatetime`) VALUES ('011', 1800, '5500', '00微云科技', '2022-12-31 00:00:00', '魏雨', '2023-01-04 10:11:22');
INSERT INTO `dws_front_dev`.`orderinfo` (`ordercode`, `amount`, `price`, `manufacturer`, `createtime`, `createuser`, `updatetime`) VALUES ('013', 1302, '5302', '天威科技', '2024-05-24 00:00:00', '魏雨', '2024-05-23 16:59:00');
INSERT INTO `dws_front_dev`.`orderinfo` (`ordercode`, `amount`, `price`, `manufacturer`, `createtime`, `createuser`, `updatetime`) VALUES ('014', 1300, '5400', '天威科技', '2024-05-22 00:00:00', '魏雨', '2024-05-23 17:09:00');
# 新建转换作业
点击数据加工上的【...】,选择弹出菜单【新建数据加工】,作业名称为:DB2kafka。
# 拖拽图元
依次拖拽输入中的关系型数据库输入、json输入,输出中的关系型数据库输出,依次连线。如下图所示:
# 配置组件属性
1、双击"关系型数据库输入"组件,根据下图所示步骤依次配置数据源、库、SQL语句。
2、双击"JSON输出"组件,【输出方式】选择输出值,输出值字段命名为jsondata;分割数据行数输入1,点击下一步获取字段,列表中的字段是要输出到kafka中的字段,可删减。
3、双击"kafka输出"组件,选择数据源及主题,点击下一步配置字段信息,获取字段中保留json输出中的字段jsondata。
4、Ctrl+S保存该模型。
# 运行
点击【运行】按钮,可以运行已经开发完毕的场景。
# 查看数据
# 提交版本
当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。