普元数据开发平台 普元数据开发平台
产品介绍
安装部署
快速入门
操作指南
应用集成
FAQ
  • 读取kafka中的json数据输出到DB示例
  • 准备数据
  • 新建转换作业
  • 拖拽图元
  • 配置组件属性
  • 运行
  • 查看数据
  • 提交版本

# 读取kafka中的json数据输出到DB示例

本示例主要演示读取kafka中的json数据输出到关系型数据库。

主要步骤如下:

  • 准备数据
  • 新建转换作业
  • 拖拽图元
  • 配置组件属性
  • 运行
  • 查看数据
  • 提交版本

# 准备数据

创建表orderinfo_kafka。

CREATE TABLE `orderinfo_kafka` (
  `ordercode` varchar(32) NOT NULL COMMENT '订单号',
  `amount` int DEFAULT NULL COMMENT '数量',
  `price` varchar(32) DEFAULT NULL COMMENT '价格',
  `manufacturer` varchar(32) DEFAULT NULL COMMENT '厂商',
  `createtime` date DEFAULT NULL COMMENT '创建时间',
  `createuser` varchar(32) DEFAULT NULL COMMENT '创建人',
  `updatetime` timestamp NULL DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`ordercode`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='订单表';
 

# 新建转换作业

点击数据加工上的【...】,选择弹出菜单【新建数据加工】,作业名称为:kafka数据输出到DB示例。

# 拖拽图元

依次拖拽输入中的kafka输入、json输入,输出中的关系型数据库输出,依次连线。如下图所示:

image-20241121095407027

# 配置组件属性

1、双击"kafka输入"组件,根据下图所示步骤依次配置。

auto.offset.reset参数默认值是lastest(从最新的记录开始消费),若需要从最早记录消费,则可修改值为earliest。

image-20241121101209247

2、双击"JSON输入"组件,【是否将JSON源定义在一个字段中】选择是,源字段选择上游节点kafka输入中的字段名称。

点击下一步,内容规则配置页面默认,点击下一步,字段配置页面设置如下图,路径为kafka中json各字段的jsonpath。

image-20241121101119367

image-20241121101230797

3、双击"关系型数据库输出"组件,选择数据源、库及表,并配置字段映射信息,开启定时刷新并设置刷新间隔。

image-20241121102447649

image-20241121101710106

4、Ctrl+S保存该模型。

# 运行

点击【运行】按钮,可以运行已经开发完毕的场景。

以上模型为流式作业,运行成功后作业会一直处于运行中,若要停止可手动点击【停止】按钮。

此时往kafka的kafka2db中插入消息,作业运行中,数据会实时写入关系型表orderinfo_kafka中。

image-20241121103417249

image-20241121103539574

# 查看数据

通过图中操作查看数据情况

image-20241121103613991

# 提交版本

当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。

← 生成记录组件-示例3 DB数据写入kafka示例 →