普元数据集成平台 普元数据集成平台
产品介绍
安装部署
快速入门
操作指南
FAQ
  • 作业流
  • 作业类
  • 程序类
  • 控制类
  • 数据加工-转换作业
  • 数据加工-编排作业
  • 数据同步作业
  • 通用模板作业
  • 子作业流
  • SQL 程序
  • Shell 程序
  • Spark 程序
  • Flink 程序
  • MapReduce 程序
  • DI 程序
  • Python 程序
  • Dependent
  • Switch
  • Conditions

# 作业流

作业流是通过拖拽任务节点并建立任务节点的关联所形成的可视化DAG,用户可以将已经开发好的作业、程序拖拽至画布,建立连线。

当用户通过手动启动或定时调度运行作业流时会生成作业流实例,作业流定义每运行一次,会产生一个作业流实例,任务会按照作业流定义进行顺序执行。

如果需要进行依赖检查、条件判断、复杂上游状态依赖时,可以使用Dependent、Switch、Conditions。

本章节将详细介绍作业流定义中各个任务类型的功能及属性,具体如下:

# 作业类

  • 数据加工-转换作业
  • 数据加工-编排作业
  • 数据同步作业
  • 通用模板作业
  • 子作业流

# 程序类

  • SQL 程序
  • Shell 程序
  • Spark 程序
  • Flink 程序
  • MapReduce 程序
  • DI 程序
  • Python程序

# 控制类

  • Dependent
  • Switch
  • Conditions

作业流的画布布局如下图所示:

workflow

通用配置项说明:

配置项 是否必填 默认值 说明
作业名称 是 - 设置作业流的名称。
执行策略 是 串行优先 串行优先:如果对于同一个作业流定义,同时有多个作业流实例,则停止前边已经生成的作业流实例,执行最新生成的作业流实例。
串行等待:如果对于同一个作业流定义,同时有多个作业流实例,则按时间先后顺序串行执行作业流实例,后生成的作业流状态为串行等待。
串行抛弃:如果对于同一个作业流定义,同时有多个作业流实例,则抛弃后生成的作业流实例,后生成的作业流实例状态为停止。
并行:如果对于同一个作业流定义,同时有多个作业流实例,则并行执行作业流实例。
默认是"串行优先"。
全局参数 否 - 作业流的用户自定义参数。
超时告警 否 不开启 勾选超时告警,表示启用告警。
超时时间 否 30分钟 勾选超时告警后可以配置超时时间,当任务超过”超时时间”后,会发送告警并且任务执行失败。
备注 否 - 该作业流的描述。

# 数据加工-转换作业

功能介绍:在作业流中添加已经开发完的数据加工作业(转换作业)。

属性界面:

新建时在"作业路径"下拉框中选择已有数据加工作业(转换作业)。

di

在图元上右键,弹出菜单,选择【编辑】,可以修改"节点名称"和"备注"。

di

在图元上右键,弹出菜单,选择【打开】,直接跳转到该图元"作业路径"对应的集成作业(批量转换作业)画布。

di

# 数据加工-编排作业

功能介绍:在作业流中添加已经开发完的数据加工作业(编排作业)。

属性界面:

新建时在"作业路径"下拉框中选择已有数据加工(编排作业)。

在图元上右键,弹出菜单,选择【编辑】,可以修改"节点名称"和"备注"。

在图元上右键,弹出菜单,选择【打开】,直接跳转到该图元"作业路径"对应的集成作业(批量编排作业)画布。

# 数据同步作业

功能介绍:在作业流中添加已经开发完的数据同步作业。

属性界面:

新建时在"作业路径"下拉框中选择已有数据同步作业。

在图元上右键,弹出菜单,选择【编辑】,可以修改"节点名称"和"备注"。

在图元上右键,弹出菜单,选择【打开】,直接跳转到该图元"作业路径"对应的数据同步作业画布。

# 通用模板作业

功能介绍:在作业流中引用已有通用模板作业。

属性界面:

新建时在"作业路径"下拉框中选择“处理引擎”、“模板类型”、“作业路径”。

template

在图元上右键,弹出菜单,选择【编辑】,可以修改"节点名称"和"备注"。

在图元上右键,弹出菜单,选择【打开】,直接跳转到该图元"作业路径"对应的通用模板作业画布。

# 子作业流

功能介绍:在作业流中引用已有子作业流,把外部的某个作业流定义当做一个节点去执行。

属性界面:

新建时弹出窗口,选择"数据层"、"作业流",其他属性可以使用默认值。

subworkflow

在图元上右键,弹出菜单,选择【编辑】,可以修改"数据层"、"作业流",其他属性。

在图元上右键,弹出菜单,选择【打开】,直接跳转到该图元对应的作业流画布。

# SQL 程序

功能介绍:在作业流中引用已有SQL 程序。

属性界面:

新建时在"程序路径"下拉框中选择已有SQL 程序。

在图元上右键,弹出菜单,选择【编辑】,可以修改"节点名称"和"备注"。

在图元上右键,弹出菜单,选择【打开】,直接跳转到该图元"程序路径"对应的SQL 程序画布。

# Shell 程序

功能介绍:在作业流中引用已有Shell 程序。

属性界面:

新建时在"程序路径"下拉框中选择已有Shell 程序。

在图元上右键,弹出菜单,选择【编辑】,可以修改"节点名称"和"备注"。

在图元上右键,弹出菜单,选择【打开】,直接跳转到该图元"程序路径"对应的Shell 程序画布。

# Spark 程序

功能介绍:在作业流中引用已有Spark 程序。

属性界面:

新建时在"程序路径"下拉框中选择已有Spark 程序。

在图元上右键,弹出菜单,选择【编辑】,可以修改"节点名称"和"备注"。

在图元上右键,弹出菜单,选择【打开】,直接跳转到该图元"程序路径"对应的Spark 程序画布。

# Flink 程序

功能介绍:在作业流中引用已有Flink 程序。

属性界面:

新建时在"程序路径"下拉框中选择已有Flink 程序。

在图元上右键,弹出菜单,选择【编辑】,可以修改"节点名称"和"备注"。

在图元上右键,弹出菜单,选择【打开】,直接跳转到该图元"程序路径"对应的Flink 程序画布。

# MapReduce 程序

功能介绍:在作业流中引用已有MapReduce 程序。

属性界面:

新建时在"程序路径"下拉框中选择已有 MapReduce 程序。

在图元上右键,弹出菜单,选择【编辑】,可以修改"节点名称"和"备注"。

在图元上右键,弹出菜单,选择【打开】,直接跳转到该图元"程序路径"对应的 MapReduce 程序画布。

# DI 程序

功能介绍:在作业流中引用已有 DI 程序。

属性界面:

新建时下拉框中选择"程序类型"、"程序路径"。

在图元上右键,弹出菜单,选择【编辑】,可以修改"节点名称"和"备注"。

在图元上右键,弹出菜单,选择【打开】,直接跳转到该图元"程序路径"对应的 DI 程序画布。

# Python 程序

功能介绍:在作业流中引用已有 Python 程序。

属性界面:

新建时在"程序路径"下拉框中选择已有 Python 程序。

在图元上右键,弹出菜单,选择【编辑】,可以修改"节点名称"和"备注"。

在图元上右键,弹出菜单,选择【打开】,直接跳转到该图元"程序路径"对应的 Python 程序画布。

# Dependent

功能介绍:Dependent依赖检查节点。比如 A 流程依赖昨天的 B 流程执行成功,依赖节点会去检查 B 流程在昨天是否有执行成功的实例。

属性界面:

新建时在下拉框中选择"任务优先级"、"Worker分组"并添加依赖组和添加依赖。

dependent

在图元上右键,弹出菜单,选择【编辑】,可以修改"任务优先级"、"Worker分组"、"依赖组"和"依赖"。

配置项说明:

配置项 是否必填 默认值 说明
节点名称 是 Dependent 设置节点的名称。同一个作业流中确保唯一。
任务优先级 是 MEDIUM 优先级分五个等级:最高(HIGHEST),高(HIGH),中(MEDIUM),低(LOW),最低(LOWEST)。当 master 线程数不足时,级别高的流程在执行队列中会优先执行,相同优先级的流程按照先进先出的顺序执行。
Worker分组 是 default 在指定的 worker 机器组里执行。默认是 Default,可以在任一 worker 上执行。
超时告警 否 不开启 勾选超时告警、超时失败,当任务超过”超时时间”后,会发送告警邮件并且任务执行失败。
告警策略 否 超时告警 当启用超时告警后,提供两种策略选择:
超时告警:超时后发出告警通知;
超时失败:超时后作业流执行失败。
超时时间 否 30分钟 勾选超时告警后可以配置超时时间,当任务超过”超时时间”后,会发送告警并且任务执行失败。
添加依赖 是 - 添加依赖检查节点。

# Switch

功能介绍:Switch 是一个条件判断节点,依据全局变量的值和用户所编写的表达式判断结果执行对应分支。注意使用 javax.script.ScriptEngine.eval 执行表达式。

⚠️ 提示:使用 Switch 时需要在画布中拖拽好其他分支节点并建立连线,这样才能在 Switch 属性中进行选择。

属性界面:

新建时在下拉框中选择"任务优先级"、"Worker分组"、"默认分支"、"条件"。

switch

在图元上右键,弹出菜单,选择【编辑】,可以修改"任务优先级"、"Worker分组"、"默认分支"、"条件"。

配置项说明:

配置项 是否必填 默认值 说明
节点名称 是 Dependent 设置节点的名称。同一个作业流中确保唯一。
任务优先级 是 MEDIUM 优先级分五个等级:最高(HIGHEST),高(HIGH),中(MEDIUM),低(LOW),最低(LOWEST)。当 master 线程数不足时,级别高的流程在执行队列中会优先执行,相同优先级的流程按照先进先出的顺序执行。
Worker分组 是 default 在指定的 worker 机器组里执行。默认是 Default,可以在任一 worker 上执行。
失败重试次数 否 0 次 执行失败的次数。
失败重试间隔 否 1 分钟 执行失败后的重试间隔时间。
超时告警 否 不开启 勾选超时告警、超时失败,当任务超过”超时时间”后,会发送告警邮件并且任务执行失败。
告警策略 否 超时告警 当启用超时告警后,提供两种策略选择:
超时告警:超时后发出告警通知;
超时失败:超时后作业流执行失败。
超时时间 否 30分钟 勾选超时告警后可以配置超时时间,当任务超过”超时时间”后,会发送告警并且任务执行失败。
添加条件 是 - 添加条件判断节点。
默认分支 是 - 默认的流转内容,当条件中的内容为全部不符合要求时,则运行分支流转中指定的分支。
条件 是 - 可以为 switch 任务配置多个条件,当条件满足时,就会执行指定的分支,可以配置多个不同的条件来满足不同的业务,使用字符串判断时需要使用””
条件分支 是 - 当条件中的内容为不符合要求时,则运行分支流转中指定的分支。

# Conditions

功能介绍:Conditions 是一个条件节点,根据上游任务运行状态,判断应该运行哪个下游任务。目前 Conditions 支持多个上游任务,但只支持两个下游任务。当上游任务数超过一个时,可以通过且以及或操作符实现复杂上游依赖

⚠️ 提示:
1、使用 Conditions 时需要在画布中拖拽好其他分支节点并建立连线,这样才能在 Conditions 属性中进行选择。
2、Conditions 任务支持多个上游任务,但只支持两个下游任务。
3、Conditions 任务以及包含该任务的工作流不支持复制操作。
4、Conditions 的前置任务不能连接其分支节点,会造成逻辑混乱,不符合 DAG 调度。

属性界面:

新建时在下拉框中选择"任务优先级"、"Worker分组"、"默认分支"、"成功执行"、"失败执行"。

condition

在图元上右键,弹出菜单,选择【编辑】,可以修改"任务优先级"、"Worker分组"、"默认分支"、"成功执行"、"失败执行"。

配置项说明:

配置项 是否必填 默认值 说明
节点名称 是 Dependent 设置节点的名称。同一个作业流中确保唯一。
任务优先级 是 MEDIUM 优先级分五个等级:最高(HIGHEST),高(HIGH),中(MEDIUM),低(LOW),最低(LOWEST)。当 master 线程数不足时,级别高的流程在执行队列中会优先执行,相同优先级的流程按照先进先出的顺序执行。
Worker分组 是 default 在指定的 worker 机器组里执行。默认是 Default,可以在任一 worker 上执行。
失败重试次数 否 0 次 执行失败的次数。
失败重试间隔 否 1 分钟 执行失败后的重试间隔时间。
超时告警 否 不开启 勾选超时告警、超时失败,当任务超过”超时时间”后,会发送告警邮件并且任务执行失败。
告警策略 否 超时告警 当启用超时告警后,提供两种策略选择:
超时告警:超时后发出告警通知;
超时失败:超时后作业流执行失败。
超时时间 否 30分钟 勾选超时告警后可以配置超时时间,当任务超过”超时时间”后,会发送告警并且任务执行失败。
条件配置 是 - 根据前置任务的状态来跳转到对应的分支:
成功分支:当上游运行成功时,运行成功选择的分支;
失败分支:当上游运行失败时,运行失败选择的分支。
流转配置 是 - 可以为 Conditions 任务选择一个或多个上游任务:
增加上游依赖:通过选择第一个参数选择对应的任务名称,通过第二个参数选择触发的 Conditions 任务的状态;
上游任务关系选择:当有多个上游任务时,可以通过且以及或操作符实现任务的复杂关系。

← Python程序 数据源 →