# 脚本组件
# JAVA
功能介绍:JAVA组件编写 Java 程序
使用场景:采用 Java 组件中所提供的方法类似于组件开发的内容,是组件开发的一个子集,仅能够实现组件开发的processRow方法、init方法和dispose方法。该组件仅适合编写一些简单的逻辑转换不适合编写复杂的逻辑转换。如果要处理的逻辑转换比较复杂建议开发新的组件来完成。
图标:
组件界面:
界面中左侧代码片段树包含了常用到的方法和输入输出字段,右侧是代码编辑区,下面是将代码中的字段添加到数据流中的字段编辑区。 一般的使用是首先将光标移动到代码编辑区,然后在代码片段树中从Code Snippits中的Common use中双击Main方法。这时会在代码编辑区中生成processRow方法。方法内会产生基本的获取数据和传递数据的代码。
参数说明:
参数 | 说明 |
节点名称 | 当前创建的节点名称,由用户自定义且不可为空。命名可包含字母、数字、下划线。 |
字段列表 | 当有java程序中有新增字段生成输出,需在该处指定新生成字段相关信息 |
参数列表 | 定义输入参数,例如,指定从输入流中获取值的字段名称 |
消息步骤 | 指定输入流来源步骤 |
目标步骤 | 指定输入流输出步骤 |
Code Snippits/Common use | Main:生成主类模板 |
init:定义一些初始化资源 | |
dispose:正常执行完毕还是异常执行完毕,调用该方法,可用于释放资源 |
方法说明:
# Common use(常用)
# Main
方法说明:生成processRow方法默认实现,用于进行数据转换操作,该方法只要上一个步骤有数据传递过来,会一直执行该方法,直到上一个步骤传递过来的数据被全部处理完。
方法使用:
public boolean processRow(StepMetaInterface smi,StepDataInterface sdi) throws DataInterfaceException{
Object[] r = getRow();
if(r == null){
setOutputDone();
return false;
}
if(first){
first = false;
}
r = createOutputRow(r,data.outputRowMeta.size());
putRow(data.outputRowMeta,r);
return true;
}
# Implement init
方法说明:生成init方法默认实现,用于数据初始化,在进行init方法之后才会执行processRow方法。
方法使用:
public void init(StepMetaInterface smi,StepDataInterface sdi){
parent.init(smi,sdi);
}
# Implement dispose
方法说明:生成dispose方法默认实现,用于在数据转换完成后的数据关闭操作,执行完processRow方法后才会调用该方法。
方法使用:
public void dispose(StepMetaInterface smi,StepDataInterface sdi){
parent.dispose(smi,sdi);
}
# getRow
方法说明:该方法能够获取上一个步骤处理完后的一行数据,这个方一般在processRow方法中使用。
方法使用:Object[] row = getRow();
# getRowFrom
方法说明:该方法能够获取上一个特定步骤处理完后的一行数据,这个方一般在processRow方法中使用。
方法使用:Object[] row = getRowFrom(rowSet);
# putRow
方法说明:这个方法用于将处理完成后的记录传递给下一个步骤,这个方一般在processRow方法中使用。
方法使用:putRow(data,outputRowMeta,r);
# putRowTo
方法说明:这个方法用于将处理完成后的记录传递给下一个特定步骤,这个方一般在processRow方法中使用。
方法使用:putRowTo(rowMeta,row,rowSet);
# putError
方法说明:这个方法是在处理数据时如果遇到异常产生错误信息后,将错误信息传递给异常连线的步骤时使用,这个方一般在processRow方法中使用。
方法使用:putError(rowMeta,row,nrErrors,errorDescriptions,fieldNames,erroeCodes);
# setErrors
方法说明:这个方法是在处理数据时如果遇到异常产生错误信息后停止作业执行,这个方一般在processRow方法中使用。
方法使用:setErrors(1);
# stopAll
方法说明:这个方法用于停止转换运行,这个方一般在processRow方法中使用。
方法使用:stopAll();
# setOutputDone
方法说明:这个方法用于步骤处理完数据时停止步骤运行,这个方一般在processRow方法中使用。
方法使用:setOutputDone();
# getVariable
方法说明:这个方法用于获取变量值,这个方一般在processRow方法中使用。
方法使用:String getVariable = getVariable(variableName,defaultValue);
# checkFeedback
方法说明:这个方法用于检查处理记录数,这个方一般在processRow方法中使用。
方法使用:boolean checkFeedback = checkFeedback(lines);
# addResultFile
方法说明:这个方法用于添加结果文件,这个方一般在processRow方法中使用。
方法使用:addResultFile(resultFile);
# getResultFiles
方法说明:这个方法用于获取结果文件,这个方一般在processRow方法中使用。
方法使用:Map getResultFiles = getResultFiles();
# Step status(步骤状态)
# incrementLinesInput
方法说明:这个方法用于添加输入记录数,这个方一般在processRow方法中使用。
方法使用:long incrementLinesInput = incrementLinesInput();
# incrementLinesOutput
方法说明:这个方法用于添加输出记录数,这个方一般在processRow方法中使用。
方法使用:long incrementLinesOutput = incrementLinesOutput();
# incrementLinesRead
方法说明:这个方法用于添加读取记录数,这个方一般在processRow方法中使用。
方法使用:long incrementLinesRead = incrementLinesRead();
# incrementLinesRejected
方法说明:这个方法用于添加拒绝记录数,这个方一般在processRow方法中使用。
方法使用:long incrementLinesRejected = incrementLinesRejected();
# incrementLinesSkipped
方法说明:这个方法用于添加错误记录数,这个方一般在processRow方法中使用。
方法使用:long incrementLinesSkipped = incrementLinesSkipped();
# incrementLinesUpdated
方法说明:这个方法用于添加更新记录数,这个方一般在processRow方法中使用。
方法使用:long incrementLinesUpdated = incrementLinesUpdated();
# incrementLinesWritten
方法说明:这个方法用于添加写记录数,这个方一般在processRow方法中使用。
方法使用:long incrementLinesWritten = incrementLinesWritten();
# getLinesInput
方法说明:这个方法用于获取输入记录数,这个方一般在processRow方法中使用。
方法使用:long getLinesInput = getLinesInput();
# getLinesOutput
方法说明:这个方法用于获取输出记录数,这个方一般在processRow方法中使用。
方法使用:long getLinesOutput = getLinesOutput();
# getLinesRead
方法说明:这个方法用于获取读取记录数,这个方一般在processRow方法中使用。
方法使用:long getLinesRead = getLinesRead();
# getLinesRejected
方法说明:这个方法用于获取拒绝记录数,这个方一般在processRow方法中使用。
方法使用:long getLinesRejected = getLinesRejected();
# getLinesSkipped
方法说明:这个方法用于获取错误记录数,这个方一般在processRow方法中使用。
方法使用:long getLinesSkipped = getLinesSkipped();
# getLinesUpdated
方法说明:这个方法用于获取更新记录数,这个方一般在processRow方法中使用。
方法使用:long getLinesUpdated = getLinesUpdated();
# getLinesWritten
方法说明:这个方法用于获取写记录数,这个方一般在processRow方法中使用。
方法使用:long getLinesWritten = getLinesWritten();
# getProcessed
方法说明:这个方法用于获取处理记录数,这个方一般在processRow方法中使用。
方法使用:long getProcessed = getProcessed();
# getStatus
方法说明:这个方法用于获取执行状态,这个方一般在processRow方法中使用。
方法使用:StepExecutionStatus getStatus = getStatus();
# getStatusDescription
方法说明:这个方法用于获取执行状态描述,这个方一般在processRow方法中使用。
方法使用:String getStatusDescription = getStatusDescription();
# setLinesInput
方法说明:这个方法用于设置输入操作数,这个方一般在processRow方法中使用。
方法使用:setLinesInput(newLinesInputValue);
# setLinesOutput
方法说明:这个方法用于设置输出操作数,这个方一般在processRow方法中使用。
方法使用:setLinesOutput(newLinesOutputValue);
# setLinesRead
方法说明:这个方法用于设置读取操作数,这个方一般在processRow方法中使用。
方法使用:setLinesRead(newLinesReadValue);
# setLinesRejected
方法说明:这个方法用于设置拒绝操作数,这个方一般在processRow方法中使用。
方法使用:setLinesRejected(linesRejected);
# setLinesSkipped
方法说明:这个方法用于设置错误操作数,这个方一般在processRow方法中使用。
方法使用:setLinesSkipped(newLinesSkippedValue);
# setLinesUpdated
方法说明:这个方法用于设置更新操作数,这个方一般在processRow方法中使用。
方法使用:setLinesUpdated(newLinesUpdatedValue);
# setLinesWritten
方法说明:这个方法用于设置写操作数,这个方一般在processRow方法中使用。
方法使用:setLinesWritten(newLinesWrittenValue);
# Setp Logging(日志)
# logBasic
方法说明:这个方法用于记录基本日志。
方法使用:logBasic(msg);
# logDebug
方法说明:这个方法用于记录调试日志。
方法使用:logDebug(msg);
# logDetailed
方法说明:这个方法用于记录详细日志。
方法使用:logDetailed(msg);
# logError
方法说明:这个方法用于记录错误日志。
方法使用:logError(msg);logError(msg,e);
# logMinimal
方法说明:这个方法用于记录最小日志。
方法使用:logMinimal(msg);
# logRowlevel
方法说明:这个方法用于记录行级日志。
方法使用:logRowlevel(msg);
# Step/Row Listeners(步骤监听)
# addRowListener
方法说明:这个方法用于添加行监听器。
方法使用:addRowListener(rowListener);
# addStepListener
方法说明:这个方法用于添加步骤监听器。
方法使用:addStepListener(stepListener);
# getRowListeners
方法说明:这个方法用于获取行监听器。
方法使用:List getRowListeners = getRowListeners();
# getStepListeners
方法说明:这个方法用于获取步骤监听器。
方法使用:List getStepListeners = getStepListeners();
# removeRowListener
方法说明:这个方法用于删除行监听器。
方法使用:removeRowListener(rowListener);
# Row manipulation(数据操作)
# getInputRowMeta
方法说明:这个方法用于获取行的元数据。
方法使用:RowMetaInterface getInputRowMeta = getInputRowMeta();
# getErrorRowMeta
方法说明:这个方法用于获取异常行的元数据。
方法使用:RowMetaInterface getErrorRowMeta = getErrorRowMeta();
# getInputRowSets
方法说明:这个方法用于获取上一步骤集合。
方法使用:List getInputRowSets = getInputRowSets();
# getOutputRowSets
方法说明:这个方法用于获取下一步骤集合。
方法使用:List getOutputRowSets = getOutputRowSets();
# findInputRowSet
方法说明:这个方法用于查找上一步骤。
方法使用:RowSet findInputRowSet = findInputRowSet(sourceStep)
# findOutputRowSet
方法说明:这个方法用于查找下一步骤。
方法使用:RowSet findOutputRowSet = findOutputRowSet(targetStep);
# Uncommon use(常见操作)
# getStepMeta
方法说明:这个方法用于获取步骤元数据信息。
方法使用:StepMeta getStepMeta = getStepMeta();
# getTrans
方法说明:这个方法用于获取转换信息。
方法使用:Trans getTrans = getTrans();
# getTransMeta
方法说明:这个方法用于获取转换元数据信息。
方法使用:TransMeta getTransMeta = getTransMeta();
# getSetpDataInterface
方法说明:这个方法用于获取组件数据信息。
方法使用:SetpDataInterface getSetpDataInterface = getSetpDataInterface();
# getPartitionID
方法说明:这个方法用于获取分区信息。
方法使用:String getPartitionID = getPartitionID();
# getPartitionTargets
方法说明:这个方法用于获取下一步骤分区信息。
方法使用:Map getPartitionTargets = getPartitionTargets();
# getRepartitioning
方法说明:这个方法用于获取分区个数。
方法使用:int getRepartitioning = getRepartitioning();
# isParitioned
方法说明:这个方法用于判断是否存在分区。
方法使用:boolean isParitioned = isParitioned();
# getClusterSize
方法说明:这个方法用于获取集群个数。
方法使用:int getClusterSize = getClusterSize();
# getErrors
方法说明:这个方法用于获取错误数。
方法使用:long getErrors = getErrors();
# getRuntime
方法说明:这个方法用于获取运行时长。
方法使用:long getRuntime = getRuntime();
# getSlaveNr
方法说明:这个方法用于获取子服务器个数。
方法使用:int getSlaveNr = getSlaveNr();
# getStepname
方法说明:这个方法用于获取步骤名称。
方法使用:String getStepname = getStepname();
# setVariable
方法说明:这个方法用于设置变量。
方法使用:setVariable(variableName,variableValue);
# JavaScript
功能介绍:JavaScript组件用于编写 JavaScript 脚本程序
使用场景:直接javaScript脚本
图标:
组件界面:
参数说明:
参数 | 说明 |
---|---|
节点名称 | 当前创建的节点名称,由用户自定义且不可为空。命名可包含字母、数字、下划线。 |
字段列表 | 当有javaScript脚本中有新增字段生成输出,需在该处指定新生成字段相关信息,且可以重新定义输出字段在输出流中名称 |
# 执行SQL脚本
功能介绍:执行SQL脚本用于在转换的开始阶段或者对传入该步骤的每一行数据执行SQL脚本
使用场景:执行一条或多条sql语句
图标:
组件界面:
参数说明:
参数 | 说明 |
节点名称 | 当前创建的节点名称,由用户自定义且不可为空。命名可包含字母、数字、下划线。 |
选择数据源 | 选择输出数据源 |
SQL编辑 | 需要执行的sql,勾选此选项时,才能进行变量或参数替换 |
执行设置 | 执行每一行:组件初始化时执行,勾选时,sql才能进行变量或参数替换 |
作为单个语句执行:勾选此选项,执行多条;分隔的sql语句 | |
变量替换:勾选此选项,使用变量替换${}标识字段 | |
绑定参数:勾选此选项,可以使用输入流中字段作为参数,否则,只能使用'?'作为参数标识,参数字段值需要和'?'位置对应 | |
引用字符串:勾选此选项,值为字符串参数,用quote(''/"",根据数据库而定)包裹 | |
作为参数字段 | 勾选绑定参数,选择前置步骤字段作为参数,否则,用于填写字段值 |
包含插入状态字段 | 指定包含插入记录数量的统计信息字段 |
包含更新状态字段 | 指定包含更新记录数量的统计信息字段 |
包含删除状态字段 | 指定包含删除记录数量的统计信息字段 |
包含读状态字段 | 指定包含查询记录数量的统计信息字段 |