# 读取kafka中的json数据输出到DB示例
本示例主要演示读取kafka中的json数据输出到关系型数据库。
主要步骤如下:
# 准备数据
创建表orderinfo_kafka。
CREATE TABLE `orderinfo_kafka` (
`ordercode` varchar(32) NOT NULL COMMENT '订单号',
`amount` int DEFAULT NULL COMMENT '数量',
`price` varchar(32) DEFAULT NULL COMMENT '价格',
`manufacturer` varchar(32) DEFAULT NULL COMMENT '厂商',
`createtime` date DEFAULT NULL COMMENT '创建时间',
`createuser` varchar(32) DEFAULT NULL COMMENT '创建人',
`updatetime` timestamp NULL DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`ordercode`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='订单表';
# 新建转换作业
点击数据加工上的【...】,选择弹出菜单【新建数据加工】,作业名称为:kafka数据输出到DB示例。
# 拖拽图元
依次拖拽输入中的kafka输入、json输入,输出中的关系型数据库输出,依次连线。如下图所示:
# 配置组件属性
1、双击"kafka输入"组件,根据下图所示步骤依次配置。
auto.offset.reset参数默认值是lastest(从最新的记录开始消费),若需要从最早记录消费,则可修改值为earliest。
2、双击"JSON输入"组件,【是否将JSON源定义在一个字段中】选择是,源字段选择上游节点kafka输入中的字段名称。
点击下一步,内容规则配置页面默认,点击下一步,字段配置页面设置如下图,路径为kafka中json各字段的jsonpath。
3、双击"关系型数据库输出"组件,选择数据源、库及表,并配置字段映射信息,开启定时刷新并设置刷新间隔。
4、Ctrl+S保存该模型。
# 运行
点击【运行】按钮,可以运行已经开发完毕的场景。
以上模型为流式作业,运行成功后作业会一直处于运行中,若要停止可手动点击【停止】按钮。
此时往kafka的kafka2db中插入消息,作业运行中,数据会实时写入关系型表orderinfo_kafka中。
# 查看数据
通过图中操作查看数据情况
# 提交版本
当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。