普元数据开发平台 普元数据开发平台
产品介绍
安装部署
快速入门
操作指南
应用集成
运维指南
  • RocketMQ Source组件使用说明
  • 组件说明
  • 配置项说明
  • FAQ

# RocketMQ Source组件使用说明

  • 组件说明
  • 配置项说明
  • 使用示例
    • RocketMQ往关系型数据库输入数据
  • FAQ

# 组件说明

RocketMQ Source连接器。用于从 RocketMQ 读取数据。

# 配置项说明

配置名称 数据类型 是否必填 默认值 描述
节点名称 String 是 RocketMQ 当前创建的节点名称,由用户自定义且不可为空。命名可包含字母、数字、下划线。确保唯一性。
节点编码 String 是 自动生成 当前创建的节点编码,以此标识此组件,由用户自定义且不可为空。命名可包含字母、数字、下划线。确保唯一性。
选择数据源 String 是 - 从下拉选项中选择列出的当前项目已经关联的数据源。
Topics String 是 - Topic名称,多个用逗号分隔(如"tpc1,tpc2") 。
消息标签 String 否 - 消息标签。
消费组ID String 否 - 消费组ID。
消费模式 String 否 - 有五种选项:
CONSUME_FROM_LAST_OFFSET
CONSUME_FROM_FIRST_OFFSET
CONSUME_FROM_GROUP_OFFSETS
CONSUME_FROM_TIMESTAMP
CONSUME_FROM_SPECIFIC_OFFSETS
动态发现间隔 String 否 -1 动态发现间隔。
时间戳 Long 否 - 消费模式为 CONSUME_FROM_TIMESTAMP 时的时间戳。
偏移量 Map 否 - 消费模式为 CONSUME_FROM_SPECIFIC_OFFSETS 时的 Offset 映射,格式:{topic-partition = offset}。
批处理大小 Int 否 100 一次读取数据大小,单位为 MB。
跳过解析错误 Boolean 否 否 是否跳过解析错误,开启时容错。
是否定期提交 String 否 - 定期提交 。
数据格式 String 是 - 数据格式,支持json、text。
text格式可用分隔符(field.delimiter)自定义分隔符。
数据结构 List 是 - 定义数据的字段名称和类型(JSON格式下通常需要)。 详见FAQ-Q1
可选参数 否 - 其他参数,用户可以根据需求进行配置。

# FAQ

Q1:数据结构都包含什么,都支持什么类型?

  • 包括字段名称和字段类型:
  • 字段类型:
    • string
    • boolean
    • tinyint
    • smallint
    • int
    • bigint
    • float
    • double
    • decimal
    • bytes
    • date
    • timestamp
    • map (map<string, int>,map<string, string>,map<string, boolean>,map<string, tinyint>,map<string, smallint>,map<string, bigint>,map<string, float>,map<string, double>,map<string, decimal>,map<string, date>,map<string, time>,map<string, timestamp>,map<string, null>,map<string, array>,map<string, map> )
    • array

Q:如何实现类似Kafka的“earliest”效果?

A:设置start.mode = "CONSUME_FROM_FIRST_OFFSET"。

Q:消费慢怎么办?

A:适当调大batch.size(如500~1000),或增加parallelism并行度。

Q:消息解析失败导致任务卡死?

A:升级到2.3.10+版本,设置skip.parse.errors = true。

Q:动态发现分区不生效?

A:检查partition.discovery.interval.millis是否设置为大于0的值(单位毫秒)。

← Redis StarRocks →