虽然这不完全符合您的要求,但您可以通过使用具有通用功能的 用户定义的 Java 类 组件来实现您的目标。将您的步骤Get previous row fields 到Non-values in row 替换为此组件的单个实例。在该组件的 Classes - Processor 部分中插入以下代码:
Object[] previousRow;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
// First, get a row from the default input hop
Object[] r = getRow();
// If the row object is null, we are done processing.
if (r == null) {
setOutputDone();
return false;
}
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
// enough to handle any new fields you are creating in this step.
Object[] outputRow = createOutputRow(r, data.outputRowMeta.size());
// copy all input fields to the output fields
for (int i=0; i < getInputRowMeta().size(); i++) {
logBasic(data.inputRowMeta.getString(r, i));
if (data.inputRowMeta.getString(r, i) == null && (previousRow != null)) {
// if the current field is empty take it from the previous row
outputRow[i] = previousRow[i];
}
else {
// otherwise use the current row
outputRow[i] = r[i];
}
}
putRow(data.outputRowMeta, outputRow);
// store the current row as future previous row
previousRow = data.outputRowMeta.cloneRow(outputRow);
return true;
}
Janino 类始终保留上一行的副本以填充当前行的空白字段。
以下测试设置演示了组件的使用。在最简单的情况下,我们处理从 CSV 文件读取的流:
输入文件配置如下:
并包含以下数据
NUMBER;STRING;DATE;CURRENCY
1;A;01.02.2014;12,5
2;B;;13,5
;;03.12.2001;
4;;;
5;C;;
6;;20.03.2005;18,2
7;D;;
用户定义的Java类组件的配置如下:
输出文本文件包含“无间隙”的增强行:
NUMBER;STRING;DATE;CURRENCY
1;A;01.02.2014; 012,50
2;B;01.02.2014; 013,50
2;B;03.12.2001; 013,50
4;B;03.12.2001; 013,50
5;C;03.12.2001; 013,50
6;C;20.03.2005; 018,20
7;D;20.03.2005; 018,20
注意:
- 该组件已针对这四种数据类型进行了测试,但原则上它应该适用于所有数据类型。
- 它与实际的字段数无关。
- 一旦填写了某个字段,就永远不会“未填写”,这对您的设置(我猜)很好,但这可能不适用于其他设置。
- 该机制仅在字段为
null 时有效。仅包含空格的字符串可能会破坏它,因此请确保在将所有字符串通过管道传递到组件之前对其进行修剪。
代码是使用http://wiki.pentaho.com/display/EAI/User+Defined+Java+Class作为教程编写的。
附录
@manu 提供的链接包含以下代码。它包含对数字格式的特定处理。请注意,它不再是完全通用的。
Object[] previousRow;
RowMetaInterface outputMeta;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
// First, get a row from the default input hop
Object[] r = getRow();
// If the row object is null, we are done processing.
if (r == null) {
setOutputDone();
return false;
}
if (outputMeta == null) {
outputMeta = data.outputRowMeta.clone();
for(int i=0; i < outputMeta.size(); i++) {
ValueMetaInterface meta = outputMeta.getValueMeta(i);
if (meta.getType() == ValueMetaInterface.TYPE_NUMBER) {
meta.setPrecision(4);
meta.setConversionMask("#.####");
}
}
}
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
// enough to handle any new fields you are creating in this step.
Object[] outputRow = createOutputRow(r, data.outputRowMeta.size());
// copy all input fields to the output fields
for (int i=0; i < getInputRowMeta().size(); i++) {
if ((r[i] == null) && (previousRow != null)) {
// if the current field is empty take it from the previous row
outputRow[i] = previousRow[i];
}
else {
// otherwise use the current row
outputRow[i] = r[i];
}
}
putRow(outputMeta, outputRow);
// store the current row as future previous row
previousRow = outputMeta.cloneRow(outputRow);
return true;
}