【问题标题】:Filling data gaps in a stream in Pentaho Data Integration, is it possible?在 Pentaho 数据集成中填补流中的数据空白,有可能吗?
【发布时间】:2014-11-20 07:31:23
【问题描述】:

我有一个包含欧元兑美元汇率的 CSV 文件。该文件是从Bank of Canada 下载的。我从 2013 年 10 月 10 日起下载了包含数据的 CSV。

然而,数据中存在差距,即。没有转换率的天数。

我一直在努力(与 Spoon Kettle 的第一天)寻找一种简单(但通用)的方法来填补空白,例如,使用最后一个非空值。我设法做到这一点的唯一方法是链接 4 个“获取上一行字段”并在计算器中使用 NVL 来获取第一个非空值。但这仅适用于流中的间隙不大于 4 行的情况。

图像代表变换:

我的第一个问题简化为:是否有一种通用方法可以在有间隙的流中进行插值/外插?

我尝试使用“修改后的 JavaScript 值”,但 API 仍然让我无法理解。此外,这一步似乎只有 MapReduce 组合的 Map 部分,我可能需要两者。

那么,我的第二个问题是:有没有办法用非 Java 语言(Scala、Clojure、Jython 或 JS)编写 MapReduce 组合?

【问题讨论】:

  • 一个组件在 pentaho 中是否需要两个输入?

标签: pentaho kettle


【解决方案1】:

您可以结合使用以下三个步骤:

1) 分析查询 - 允许您获取当前行之前或之后 N 行的字段的值;在您的情况下,您需要提前 1 行获取日期(下一个可用日期)

2) 计算器 - 确定了行的前一个日期,使用它来计算日期之间的天数;

3)计算一个字段number_of_clones为dbd-1(缺失的天数;

4) 使用 Clone Rows 步骤中的该字段根据需要多次复制一行;添加一个 clone_number 字段

5) 将 clone_number 作为天数添加到日期中,即可得到它所指的日期。

此外,分析查询步骤允许您将一个字段指定为“分组依据”字段,这样如果您有美元的 x 汇率,然后您有英镑的 x 汇率,那么最终的美元 x 汇率日将检索 null 作为下一个值。

这是一个示例 KTR 文件:

数据网格步骤会生成几行,其中存在一些数据间隙:

分析查询获取下一个日期,相同的货币价值

然后计算器步骤计算缺少多少行。请注意,每种货币的最后一天都会有 null 作为值,所以我们需要调整它并使用 0 代替(如果 A 为空,NVL(A,B) 返回 B,否则返回 A)

克隆行:获取一行并创建副本。

clone_number 字段允许我们计算行引用的实际日期

最后,这是数据。您需要的字段是 new_date、currency 和 exchange_rate。使用选择值对字段列表重新排序并删除不再需要的字段。

如您所见,现在我们有 2014-01-03 和 2014-01-04 的数据,使用之前的已知值。

【讨论】:

  • +1 仅使用现有组件。实际上,我也看过克隆和分析组件,但我无法将它们完全放在一起。
  • 这其实很好。我想我应该把它作为我第一个问题的答案来宣传。
  • 非常感谢你,伙计。你不知道这有多大帮助。希望我能给你超过 +1 :)
  • 我想说,计算汇率的缺失日期是许多领域的常见问题。感谢您提供这个优雅的解决方案。帮了大忙。
  • 太棒了,非常感谢伙计。我还用它来填充计算斜率的值。
【解决方案2】:

虽然这不完全符合您的要求,但您可以通过使用具有通用功能的 用户定义的 Java 类 组件来实现您的目标。将您的步骤Get previous row fieldsNon-values in row 替换为此组件的单个实例。在该组件的 Classes - Processor 部分中插入以下代码:

Object[] previousRow;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
    // First, get a row from the default input hop
    Object[] r = getRow();

    // If the row object is null, we are done processing.
    if (r == null) {
      setOutputDone();
      return false;
    }

    // It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
    // enough to handle any new fields you are creating in this step.
    Object[] outputRow = createOutputRow(r, data.outputRowMeta.size());

    // copy all input fields to the output fields

    for (int i=0; i < getInputRowMeta().size(); i++) {
        logBasic(data.inputRowMeta.getString(r, i));
        if (data.inputRowMeta.getString(r, i) == null && (previousRow != null))  {
            // if the current field is empty take it from the previous row
            outputRow[i] = previousRow[i];
        }   
        else {
            // otherwise use the current row
            outputRow[i] = r[i];
        }

    }

    putRow(data.outputRowMeta, outputRow);
    // store the current row as future previous row
    previousRow = data.outputRowMeta.cloneRow(outputRow);

    return true;
}

Janino 类始终保留上一行的副本以填充当前行的空白字段。

以下测试设置演示了组件的使用。在最简单的情况下,我们处理从 CSV 文件读取的流:

输入文件配置如下:

并包含以下数据

NUMBER;STRING;DATE;CURRENCY
1;A;01.02.2014;12,5
2;B;;13,5
;;03.12.2001;
4;;;
5;C;;
6;;20.03.2005;18,2
7;D;;

用户定义的Java类组件的配置如下:

输出文本文件包含“无间隙”的增强行:

NUMBER;STRING;DATE;CURRENCY
1;A;01.02.2014; 012,50
2;B;01.02.2014; 013,50
2;B;03.12.2001; 013,50
4;B;03.12.2001; 013,50
5;C;03.12.2001; 013,50
6;C;20.03.2005; 018,20
7;D;20.03.2005; 018,20

注意:

  • 该组件已针对这四种数据类型进行了测试,但原则上它应该适用于所有数据类型。
  • 它与实际的字段数无关。
  • 一旦填写了某个字段,就永远不会“未填写”,这对您的设置(我猜)很好,但这可能不适用于其他设置。
  • 该机制仅在字段为null 时有效。仅包含空格的字符串可能会破坏它,因此请确保在将所有字符串通过管道传递到组件之前对其进行修剪。

代码是使用http://wiki.pentaho.com/display/EAI/User+Defined+Java+Class作为教程编写的。

附录

@manu 提供的链接包含以下代码。它包含对数字格式的特定处理。请注意,它不再是完全通用的。

Object[] previousRow;
RowMetaInterface outputMeta;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
    // First, get a row from the default input hop
    Object[] r = getRow();

    // If the row object is null, we are done processing.
    if (r == null) {
        setOutputDone();
        return false;
    }

    if (outputMeta == null) {
        outputMeta = data.outputRowMeta.clone();
    for(int i=0; i < outputMeta.size(); i++) {
        ValueMetaInterface meta = outputMeta.getValueMeta(i);
        if (meta.getType() == ValueMetaInterface.TYPE_NUMBER) {
            meta.setPrecision(4);
            meta.setConversionMask("#.####");
        }
    }
}

// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
// enough to handle any new fields you are creating in this step.
Object[] outputRow = createOutputRow(r, data.outputRowMeta.size());

// copy all input fields to the output fields

for (int i=0; i < getInputRowMeta().size(); i++) {
    if ((r[i] == null) && (previousRow != null)) {
        // if the current field is empty take it from the previous row
        outputRow[i] = previousRow[i];
    }
    else {
        // otherwise use the current row
        outputRow[i] = r[i];
    }     
}

putRow(outputMeta, outputRow);
// store the current row as future previous row
previousRow = outputMeta.cloneRow(outputRow);

return true;
}

【讨论】:

  • 嗨马库斯,感谢您的提示。它是 Java,但我会管理。提供的脚本无法正常工作,因为 data.inputRowMeta.getString(r, i) 对于非字符串字段(例如我的货币汇率数据)将始终为空。我已经尝试了 getNumber ,甚至用isNull(r, i) 替换了条件,但它不会工作,或者第一行值被永远复制。在gist.github.com/mvaled/eb5e043e0d625c824751 中,我展示了我正在尝试的当前方法。
  • Scratch last comment... 要点中的解决方案有效,但我看到的数字精度错误,所以它们都显示 1.4 而不是 1.3845(例如),即。它们被四舍五入到小数点后一位。所以我必须弄清楚如何改变输出字段的精度......
  • 我设法打印了#.#### 数字。要点更新。谢谢。
  • @manu:你不认为格式问题可以在前面或后面的组件中解决吗?通用 Java 类组件应该只是在类型进入时“转发”它们。
  • 这些输出字段的来源是Group By 步骤。似乎将我所有数字字段的格式推断为“#.#;-#.#”,至少使用 Spoon (pdi-ce-4.4.0-stable) 我无法更改这些填充的元数据以更改格式...我想我可以按照您建议的“在 Janino 课程之外”进行操作,但可以找到在哪里。这就是为什么我必须这样做...如果您找到任何解决方案,请告诉我。
猜你喜欢
  • 2020-02-17
  • 1970-01-01
  • 2018-10-28
  • 1970-01-01
  • 2014-12-17
  • 2015-04-12
  • 2020-09-25
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多