普元数据开发平台 普元数据开发平台
产品介绍
安装部署
快速入门
操作指南
应用集成
运维指南
  • StarRocks Sink组件使用说明
  • 组件说明
  • 配置项说明
  • 关键功能与原理
  • 性能调优
  • FAQ

# StarRocks Sink组件使用说明

  • 组件说明
  • 配置项说明
  • 使用示例
    • StarRocks数据库同步
    • Mysql多表数据同步到StarRocks
  • FAQ

# 组件说明

用于将数据写入 StarRocks 数据库。

提示

点击对应表的【目标表参数】按钮展开的配置项,完成配置后必须点击弹窗底部的【保存配置】按钮才能保存各自表的配置参数。

# 配置项说明

配置名称 数据类型 是否必填 默认值 描述
节点名称 String 是 StarRocks 当前创建的节点名称,由用户自定义且不可为空。命名可包含字母、数字、下划线。确保唯一性。
节点编码 String 是 自动生成 当前创建的节点编码,以此标识此组件,由用户自定义且不可为空。命名可包含字母、数字、下划线。确保唯一性。
选择数据源 String 是 - 从下拉选项中选择列出的当前项目已经关联的数据源。
选择库 String 是 - 从下拉列表中选择数据库。
表添加模式 String 否 表不存在时创建 有四种:
- 重新创建表
- 表不存在时创建
- 表不存在时报告错误
- 忽略对表的处理
选择表 String 是 - 从下拉列表中选择数据表。
数据添加模式 String 否 追加数据 支持四种模式:
1、追加数据:数据统一追加到目标表中;
2、同步前删除数据:先清空目标表数据,然后再给目标表添加数据;
3、当存在数据时,报告错误:当目标表存在相同数据时,报告错误,不会添加数据。
4、自定义处理:用户可以自定义写SQL的方式进行处理。
自定义SQL 当「数据添加模式」为「自定义处理」时,执行的自定义SQL(如TRUNCATE TABLE)。
大小写转换模式 String 否 不转换 支持三种配置:
- 不转换:目标表与来源表的表名、字段名大小写一致;
- 转大写:将目标表的表名、字段名转换为大写;
- 转小写:将目标表的表名、字段名转换为小写。
流加载标签前缀 String 否 - StarRocks Stream Load (流加载)导入作业的标签的前缀。用于任务级别去重与事务管理。
批量提交最大缓存行数 String 否 1024 对于批处理写入,当缓冲区数达到 batch_max_rows 数或 batch_max_bytes 字节大小或时间达到 batch_interval_ms 时,数据将被刷新到StarRocks中。
批量提交最大缓存 Int 否 5 对于批处理写入,当缓冲区数达到 batch_max_rows 数或 batch_max_bytes 字节大小或时间达到 batch_interval_ms 时,数据将被刷新到 StarRocks 中。
最大重试次数 Int 否 0 失败的最大重试次数。
重试乘数 Int 否 0 用作产生下一个重试延迟的乘数,详细说明参考:http://beginners-java.blogspot.com/2015/03/customize-volley-default-retry-policy.html
重试等待时间 Int 否 0 尝试重试 StarRocks 请求之前等待的时间。
启用upsert语法支持 Boolean 否 否 是否启用 upsert/delete,只支持 PrimaryKey 模式。 false:
可选参数 Map 否 - 可以根据需求进行配置。可选参数名称可以参考 Seatunnel 官方手册。
json 格式如:
{
"starrocks.config": "{ "format" :"JSON", "strip_outer_array": true }"
}

表添加模式为“重新创建表”和“表不存在时创建”,【获取表】后,可以点击每张表后的【目标表参数】按钮分别为表配置更多的参数,具体如下:

配置名称 数据类型 是否必填 默认值 描述
名称 String 是 - 表名称。
建表类型 String 是 明细表 三种类型:主键表、明细表、聚合表、更新表。
自定义模板 Boolean 否 否 是否启用自定义模板的开关。启用后可以自己定义模板内容。
选择分桶键字段 List 否 - 从下拉列表中选择表字段作为分桶键。
建表模板 String 否 - 自 v3.1.0 起,StarRocks 支持明细表进行随机分桶(默认分桶方式)。在建表和新增分区时可以不设置哈希分桶键(即 DISTRIBUTED BY HASH 子句)。在 v3.1.0 之前,StarRocks 仅支持哈希分桶。您在建表和新增分区时必须设置哈希分桶键(即 DISTRIBUTED BY HASH 子句),否则建表失败。

# 关键功能与原理

1. 写入机制:缓存 + Stream Load

  • Sink将数据缓存在内存中,不逐行写入;
  • 触发刷写的条件三个取首个达到的阈值:
    • 行数 ≥ batch_max_rows
    • 字节数 ≥ batch_max_bytes
    • Checkpoint触发(在Zeta/Flink引擎中,即使未达到前两个阈值,也会强制写入)
  • 刷写时调用StarRocks BE的/api/stream_load接口,一次HTTP请求批量提交。

2. 一致性语义:至少一次

  • StarRocks Sink目前不原生支持两阶段提交(2PC),不提供内置的“精确一次”;
  • 生产上通过“主键去重”实现语义上的精确一次:即使任务失败重试导致重复写入,主键模型会自动去重,最终结果一致;
  • 若需要真正的两阶段提交(写时事务),需关注Doris Connector(其支持sink.enable-2pc),StarRocks与Doris在此处有差异。

3. CDC变更数据同步

  • 配置enable_upsert_delete = true后,可接收来自上游(如MySQL CDC)的INSERT/UPDATE/DELETE事件;
  • 依赖条件:StarRocks目标表必须为主键模型,否则UPDATE/DELETE无效。

4. Schema自动管理

  • 自动建表:通过save_mode_create_template定义建表语句模板,连接器根据上游Schema填充字段和主键;
  • Schema Evolution:StarRocks Sink实现了SupportSchemaEvolutionSinkWriter,可自动处理上游新增列,对目标表执行ALTER TABLE ADD COLUMN (2.3.9+)。

# 性能调优

  1. 调大 batch_max_rows 和 batch_max_bytes
  • 默认1024行/5MB太小,生产建议10240行/50MB~100MB,有效减少HTTP请求次数。
  1. 调短 checkpoint.interval
  • 若任务对延迟敏感,可将checkpoint.interval设小(如10秒),强制Checkpoint触发刷写,避免数据在内存积压过久。
  1. 增加并行度
  • 在Source端增加parallelism,多个Sink Task并行写入不同Bucket,提升吞吐。
  1. StarRocks侧优化
  • 设置合理的Bucket数,避免小文件过多;
  • BE配置streaming_load_max_mb,SeaTunnel侧batch_max_bytes不得超过该值。

# FAQ

← RocketMQ S3 →