# StarRocks Sink组件使用说明
# 组件说明
用于将数据写入 StarRocks 数据库。
提示
点击对应表的【目标表参数】按钮展开的配置项,完成配置后必须点击弹窗底部的【保存配置】按钮才能保存各自表的配置参数。
# 配置项说明
| 配置名称 | 数据类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
| 节点名称 | String | 是 | StarRocks | 当前创建的节点名称,由用户自定义且不可为空。命名可包含字母、数字、下划线。确保唯一性。 |
| 节点编码 | String | 是 | 自动生成 | 当前创建的节点编码,以此标识此组件,由用户自定义且不可为空。命名可包含字母、数字、下划线。确保唯一性。 |
| 选择数据源 | String | 是 | - | 从下拉选项中选择列出的当前项目已经关联的数据源。 |
| 选择库 | String | 是 | - | 从下拉列表中选择数据库。 |
| 表添加模式 | String | 否 | 表不存在时创建 | 有四种: - 重新创建表 - 表不存在时创建 - 表不存在时报告错误 - 忽略对表的处理 |
| 选择表 | String | 是 | - | 从下拉列表中选择数据表。 |
| 数据添加模式 | String | 否 | 追加数据 | 支持四种模式: 1、追加数据:数据统一追加到目标表中; 2、同步前删除数据:先清空目标表数据,然后再给目标表添加数据; 3、当存在数据时,报告错误:当目标表存在相同数据时,报告错误,不会添加数据。 4、自定义处理:用户可以自定义写SQL的方式进行处理。 |
| 自定义SQL | 当「数据添加模式」为「自定义处理」时,执行的自定义SQL(如TRUNCATE TABLE)。 | |||
| 大小写转换模式 | String | 否 | 不转换 | 支持三种配置: - 不转换:目标表与来源表的表名、字段名大小写一致; - 转大写:将目标表的表名、字段名转换为大写; - 转小写:将目标表的表名、字段名转换为小写。 |
| 流加载标签前缀 | String | 否 | - | StarRocks Stream Load (流加载)导入作业的标签的前缀。用于任务级别去重与事务管理。 |
| 批量提交最大缓存行数 | String | 否 | 1024 | 对于批处理写入,当缓冲区数达到 batch_max_rows 数或 batch_max_bytes 字节大小或时间达到 batch_interval_ms 时,数据将被刷新到StarRocks中。 |
| 批量提交最大缓存 | Int | 否 | 5 | 对于批处理写入,当缓冲区数达到 batch_max_rows 数或 batch_max_bytes 字节大小或时间达到 batch_interval_ms 时,数据将被刷新到 StarRocks 中。 |
| 最大重试次数 | Int | 否 | 0 | 失败的最大重试次数。 |
| 重试乘数 | Int | 否 | 0 | 用作产生下一个重试延迟的乘数,详细说明参考:http://beginners-java.blogspot.com/2015/03/customize-volley-default-retry-policy.html |
| 重试等待时间 | Int | 否 | 0 | 尝试重试 StarRocks 请求之前等待的时间。 |
| 启用upsert语法支持 | Boolean | 否 | 否 | 是否启用 upsert/delete,只支持 PrimaryKey 模式。 false: |
| 可选参数 | Map | 否 | - | 可以根据需求进行配置。可选参数名称可以参考 Seatunnel 官方手册。 json 格式如: { "starrocks.config": "{ "format" :"JSON", "strip_outer_array": true }" } |
表添加模式为“重新创建表”和“表不存在时创建”,【获取表】后,可以点击每张表后的【目标表参数】按钮分别为表配置更多的参数,具体如下:
| 配置名称 | 数据类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
| 名称 | String | 是 | - | 表名称。 |
| 建表类型 | String | 是 | 明细表 | 三种类型:主键表、明细表、聚合表、更新表。 |
| 自定义模板 | Boolean | 否 | 否 | 是否启用自定义模板的开关。启用后可以自己定义模板内容。 |
| 选择分桶键字段 | List | 否 | - | 从下拉列表中选择表字段作为分桶键。 |
| 建表模板 | String | 否 | - | 自 v3.1.0 起,StarRocks 支持明细表进行随机分桶(默认分桶方式)。在建表和新增分区时可以不设置哈希分桶键(即 DISTRIBUTED BY HASH 子句)。在 v3.1.0 之前,StarRocks 仅支持哈希分桶。您在建表和新增分区时必须设置哈希分桶键(即 DISTRIBUTED BY HASH 子句),否则建表失败。 |
# 关键功能与原理
1. 写入机制:缓存 + Stream Load
- Sink将数据缓存在内存中,不逐行写入;
- 触发刷写的条件三个取首个达到的阈值:
- 行数 ≥ batch_max_rows
- 字节数 ≥ batch_max_bytes
- Checkpoint触发(在Zeta/Flink引擎中,即使未达到前两个阈值,也会强制写入)
- 刷写时调用StarRocks BE的/api/stream_load接口,一次HTTP请求批量提交。
2. 一致性语义:至少一次
- StarRocks Sink目前不原生支持两阶段提交(2PC),不提供内置的“精确一次”;
- 生产上通过“主键去重”实现语义上的精确一次:即使任务失败重试导致重复写入,主键模型会自动去重,最终结果一致;
- 若需要真正的两阶段提交(写时事务),需关注Doris Connector(其支持sink.enable-2pc),StarRocks与Doris在此处有差异。
3. CDC变更数据同步
- 配置enable_upsert_delete = true后,可接收来自上游(如MySQL CDC)的INSERT/UPDATE/DELETE事件;
- 依赖条件:StarRocks目标表必须为主键模型,否则UPDATE/DELETE无效。
4. Schema自动管理
- 自动建表:通过save_mode_create_template定义建表语句模板,连接器根据上游Schema填充字段和主键;
- Schema Evolution:StarRocks Sink实现了SupportSchemaEvolutionSinkWriter,可自动处理上游新增列,对目标表执行ALTER TABLE ADD COLUMN (2.3.9+)。
# 性能调优
- 调大 batch_max_rows 和 batch_max_bytes
- 默认1024行/5MB太小,生产建议10240行/50MB~100MB,有效减少HTTP请求次数。
- 调短 checkpoint.interval
- 若任务对延迟敏感,可将checkpoint.interval设小(如10秒),强制Checkpoint触发刷写,避免数据在内存积压过久。
- 增加并行度
- 在Source端增加parallelism,多个Sink Task并行写入不同Bucket,提升吞吐。
- StarRocks侧优化
- 设置合理的Bucket数,避免小文件过多;
- BE配置streaming_load_max_mb,SeaTunnel侧batch_max_bytes不得超过该值。