普元数据开发平台 普元数据开发平台
产品介绍
安装部署
快速入门
操作指南
应用集成
运维指南
  • 关系型数据库到目标RocketMQ数据同步
  • 准备数据
  • 新建同步作业
  • 拖拽图元
  • 配置组件属性
  • 运行
  • 查看数据
  • 提交版本

# 关系型数据库到目标RocketMQ数据同步

本示例主要演示从Mysql读取数据,写入RocketMQ中。

主要步骤如下:

  • 准备数据
  • 新建同步作业
  • 拖拽图元
  • 运行
  • 查看数据
  • 提交版本

# 准备数据

  1. 创建 MySQL 源数据库 test,在源数据库 test 中创建一个表 rockmq_demo。
drop table if exists `rockmq_demo`;
CREATE TABLE `rockmq_demo` (
  `name` longtext,
  `age` int
);

INSERT INTO rockmq_demo (name, age) VALUES('zhangsan', 18);

# 新建同步作业

点击数据同步上的【...】,选择弹出菜单【新建数据同步作业】,作业名称为:RocketMQ_Sink。

# 拖拽图元

依次拖拽数据源中的关系型数据库 Source组件和目标中的RocketMQ Sink组件,依次连线。如下图所示:

img.png

# 配置组件属性

1、双击"关系型数据库 Source"组件,根据下图所示步骤依次配置。

img_1.png

2、双击"RocketMQ Sink"组件,根据下图所示步骤依次配置。

img_2.png

3、Ctrl+S保存该模型。

# 运行

点击【运行】按钮,可以运行已经开发完毕的场景,在日志栏可以看运行日志及运行结果。

img_3.png

img_4.png

# 查看数据

从服务器中查看写入RocketMQ的数据。

1.找到Broker名称和队列信息,定位消息的存储位置(队列与偏移量):
  ./mqadmin topicStatus -n localhost:9876 -t TopicTest
  如下图查出来的BrokerName=h2,队列3的最大偏移量是1,最新消息的偏移量应该是 0 

img_5.png

2.查询消息内容:
  ./mqadmin queryMsgByOffset -n localhost:9876 -b h2 -i 3 -o 0 -t TopicTest
  如下图标识的地方就是插入消息体路径文件,使用cat命令即可查看消息内容

img_6.png

# 提交版本

当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。

提交后的版本,可以在作业调度中进行"定时"调度配置。