普元数据开发平台 普元数据开发平台
产品介绍
安装部署
快速入门
操作指南
应用集成
运维指南
  • RocketMQ Sink 组件使用说明
  • 组件说明
  • 配置项说明
  • FAQ

# RocketMQ Sink 组件使用说明

  • 组件说明
  • 配置项说明
  • 使用示例
    • 关系型数据库往RocketMQ输入数据
  • FAQ

# 组件说明

RocketMQ Sink 连接器完全支持同步/异步发送、ACL认证、消息Tag及自定义分区键,核心优势是默认采用两阶段提交(2PC)保障Exactly-Once语义。

# 配置项说明

配置名称 数据类型 是否必填 默认值 描述
节点名称 String 是 RocketMQ 当前创建的节点名称,由用户自定义且不可为空。命名可包含字母、数字、下划线。确保唯一性。
节点编码 String 是 自动生成 当前创建的节点编码,以此标识此组件,由用户自定义且不可为空。命名可包含字母、数字、下划线。确保唯一性。
选择数据源 String 是 - 从下拉选项中选择列出的当前项目已经关联的数据源。
Topic String 是 - 主题名称,只可输入一个。
消息标签 String 否 - 消息标签(Tag),用于过滤消费。
生产组ID String 否 - 生产组ID。
分区键 String 否 - 指定字段值作为消息Key,决定发送到哪个分区(哈希取模)。从下拉列表中选择上游读取到的字段名称作为分区键。
分隔符 String 是 , 仅当format = "text"时生效,字段分隔符。
数据格式 String 是 - 有两种:json、text
同步发送 Boolean 否 否 是否启用同步发送。
否:异步发送(高吞吐);
是:同步发送(等待Broker确认)。
可选参数 否 - 其他参数,用户可以根据需求进行配置。

# FAQ

Q1:如何实现类似 Kafka 的“Exactly-Once”语义?

A:无需额外配置。SeaTunnel RocketMQ Sink 默认集成两阶段提交(2PC),只要启用 Checkpoint(checkpoint.interval > 0)即自动生效。

Q2:异步发送丢消息怎么办? A:

  • 对可靠性要求高:设置producer.send.sync = true,等待Broker确认。
  • 仍需高吞吐:保持异步,但需确保 RocketMQ Broker 集群稳定,且producer.group在NameServer正常注册。

Q3:数据写入性能瓶颈如何优化?

  • 调大parallelism(Sink并发度)
  • 使用异步发送(producer.send.sync = false)
  • 确保上游 Source 吞吐量匹配

Q4:partition.key.fields 不生效?

  • 检查字段名是否与上游 schema 定义完全一致(大小写敏感)
  • 确认上游数据该字段非空;若字段全为 null ,哈希值恒定导致分区倾斜

Q5:ACL认证失败?

  • 确保access.key和secret.key正确
  • RocketMQ 服务端版本需支持ACL(4.4.0+)
  • 自建集群需在broker.conf开启aclEnable=true

← Redis StarRocks →