普元数据开发平台 普元数据开发平台
产品介绍
安装部署
快速入门
操作指南
应用集成
运维指南
  • 读取表数据写入格式为avro写入到Kafka主题中
  • 准备数据
  • 新建同步作业
  • 拖拽图元
  • 配置组件属性
  • 运行
  • 查看数据
  • 提交版本

# 读取表数据写入格式为avro写入到Kafka主题中

本示例主要演示从Mysql表中读取数据,写入格式为Avro,写入Kafka的test_avro_data主题中。

主要步骤如下:

  • 准备数据
  • 新建同步作业
  • 拖拽图元
  • 运行
  • 查看数据
  • 提交版本

# 准备数据

创建数据库dws_source作为源库

CREATE DATABASE IF NOT EXISTS dws_source;

创建表 t_user 作为源表,并给源表 INSERT 一些数据。

-- ---------------------------------------
-- Table structure for t_user
-- ---------------------------------------
DROP TABLE IF EXISTS dws_source.`t_user`;

CREATE TABLE dws_source.`t_user` (
  `id` int NOT NULL,
  `name` varchar(400) NOT NULL,
  `age` int DEFAULT NULL,
  PRIMARY KEY (`id`)
);

-- ----------------------------
-- Records of t_user
-- ----------------------------

INSERT INTO dws_source.t_user(id, name, age)VALUES(1, '张聪', 20);
INSERT INTO dws_source.t_user(id, name, age)VALUES(2, '王建国', 20);
INSERT INTO dws_source.t_user(id, name, age)VALUES(3, '王强', 20);
INSERT INTO dws_source.t_user(id, name, age)VALUES(4, '魏雨', 18);
INSERT INTO dws_source.t_user(id, name, age)VALUES(5, '刘飞', 18);

# 新建同步作业

点击数据同步上的【...】,选择弹出菜单【新建数据同步作业】,作业名称为:mysql2kafka-avro。

# 拖拽图元

依次拖拽数据源中的关系型数据库 Source组件和目标中的Kafka Sink组件,依次连线。如下图所示:

kafka

# 配置组件属性

1、双击"关系型数据库 Source"组件,根据下图所示步骤依次配置。

kafka

2、双击"Kafka Sink"组件,根据下图所示步骤依次配置。

kafka

3、Ctrl+S保存该模型。

# 运行

点击【运行】按钮,可以运行已经开发完毕的场景,在日志栏可以看运行日志及运行结果。

kafka

# 查看数据

使用Kafka可视化工具查看数据(Kafka可视化工具可自行下载)。

或参考读取Kafka-avro格式数据示例将结果数据打印到控制台

# 提交版本

当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。

提交后的版本,可以在作业调度中进行"定时"调度配置。

kafka