# RocketMQ Sink 组件使用说明
# 组件说明
RocketMQ Sink 连接器完全支持同步/异步发送、ACL认证、消息Tag及自定义分区键,核心优势是默认采用两阶段提交(2PC)保障Exactly-Once语义。
# 配置项说明
| 配置名称 | 数据类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
| 节点名称 | String | 是 | RocketMQ | 当前创建的节点名称,由用户自定义且不可为空。命名可包含字母、数字、下划线。确保唯一性。 |
| 节点编码 | String | 是 | 自动生成 | 当前创建的节点编码,以此标识此组件,由用户自定义且不可为空。命名可包含字母、数字、下划线。确保唯一性。 |
| 选择数据源 | String | 是 | - | 从下拉选项中选择列出的当前项目已经关联的数据源。 |
| Topic | String | 是 | - | 主题名称,只可输入一个。 |
| 消息标签 | String | 否 | - | 消息标签(Tag),用于过滤消费。 |
| 生产组ID | String | 否 | - | 生产组ID。 |
| 分区键 | String | 否 | - | 指定字段值作为消息Key,决定发送到哪个分区(哈希取模)。从下拉列表中选择上游读取到的字段名称作为分区键。 |
| 分隔符 | String | 是 | , | 仅当format = "text"时生效,字段分隔符。 |
| 数据格式 | String | 是 | - | 有两种:json、text |
| 同步发送 | Boolean | 否 | 否 | 是否启用同步发送。 否:异步发送(高吞吐); 是:同步发送(等待Broker确认)。 |
| 可选参数 | 否 | - | 其他参数,用户可以根据需求进行配置。 |
# FAQ
Q1:如何实现类似 Kafka 的“Exactly-Once”语义?
A:无需额外配置。SeaTunnel RocketMQ Sink 默认集成两阶段提交(2PC),只要启用 Checkpoint(checkpoint.interval > 0)即自动生效。
Q2:异步发送丢消息怎么办? A:
- 对可靠性要求高:设置
producer.send.sync = true,等待Broker确认。 - 仍需高吞吐:保持异步,但需确保 RocketMQ Broker 集群稳定,且
producer.group在NameServer正常注册。
Q3:数据写入性能瓶颈如何优化?
- 调大
parallelism(Sink并发度) - 使用异步发送(producer.send.sync = false)
- 确保上游 Source 吞吐量匹配
Q4:partition.key.fields 不生效?
- 检查字段名是否与上游 schema 定义完全一致(大小写敏感)
- 确认上游数据该字段非空;若字段全为 null ,哈希值恒定导致分区倾斜
Q5:ACL认证失败?
- 确保
access.key和secret.key正确 - RocketMQ 服务端版本需支持ACL(4.4.0+)
- 自建集群需在
broker.conf开启aclEnable=true