# RocketMQ Source组件使用说明
# 组件说明
RocketMQ Source连接器。用于从 RocketMQ 读取数据。
# 配置项说明
| 配置名称 | 数据类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
| 节点名称 | String | 是 | RocketMQ | 当前创建的节点名称,由用户自定义且不可为空。命名可包含字母、数字、下划线。确保唯一性。 |
| 节点编码 | String | 是 | 自动生成 | 当前创建的节点编码,以此标识此组件,由用户自定义且不可为空。命名可包含字母、数字、下划线。确保唯一性。 |
| 选择数据源 | String | 是 | - | 从下拉选项中选择列出的当前项目已经关联的数据源。 |
| Topics | String | 是 | - | Topic名称,多个用逗号分隔(如"tpc1,tpc2") 。 |
| 消息标签 | String | 否 | - | 消息标签。 |
| 消费组ID | String | 否 | - | 消费组ID。 |
| 消费模式 | String | 否 | - | 有五种选项: CONSUME_FROM_LAST_OFFSET CONSUME_FROM_FIRST_OFFSET CONSUME_FROM_GROUP_OFFSETS CONSUME_FROM_TIMESTAMP CONSUME_FROM_SPECIFIC_OFFSETS |
| 动态发现间隔 | String | 否 | -1 | 动态发现间隔。 |
| 时间戳 | Long | 否 | - | 消费模式为 CONSUME_FROM_TIMESTAMP 时的时间戳。 |
| 偏移量 | Map | 否 | - | 消费模式为 CONSUME_FROM_SPECIFIC_OFFSETS 时的 Offset 映射,格式:{topic-partition = offset}。 |
| 批处理大小 | Int | 否 | 100 | 一次读取数据大小,单位为 MB。 |
| 跳过解析错误 | Boolean | 否 | 否 | 是否跳过解析错误,开启时容错。 |
| 是否定期提交 | String | 否 | - | 定期提交 。 |
| 数据格式 | String | 是 | - | 数据格式,支持json、text。 text格式可用分隔符(field.delimiter)自定义分隔符。 |
| 数据结构 | List | 是 | - | 定义数据的字段名称和类型(JSON格式下通常需要)。 详见FAQ-Q1 |
| 可选参数 | 否 | - | 其他参数,用户可以根据需求进行配置。 |
# FAQ
- 包括字段名称和字段类型:
- 字段类型:
- string
- boolean
- tinyint
- smallint
- int
- bigint
- float
- double
- decimal
- bytes
- date
- timestamp
- map (map<string, int>,map<string, string>,map<string, boolean>,map<string, tinyint>,map<string, smallint>,map<string, bigint>,map<string, float>,map<string, double>,map<string, decimal>,map<string, date>,map<string, time>,map<string, timestamp>,map<string, null>,map<string, array>,map<string, map>
)
- array
- string
Q:如何实现类似Kafka的“earliest”效果?
A:设置start.mode = "CONSUME_FROM_FIRST_OFFSET"。
Q:消费慢怎么办?
A:适当调大batch.size(如500~1000),或增加parallelism并行度。
Q:消息解析失败导致任务卡死?
A:升级到2.3.10+版本,设置skip.parse.errors = true。
Q:动态发现分区不生效?
A:检查partition.discovery.interval.millis是否设置为大于0的值(单位毫秒)。