【问题标题】:PIG- calculate difference between records by group using python UDF (window functions)PIG-使用python UDF(窗口函数)按组计算记录之间的差异
【发布时间】:2017-06-23 00:29:48
【问题描述】:

我正在尝试按组计算记录之间的差异,并按组包括行号。这可以通过使用窗口函数的 HIVE 中的滞后和行号函数来完成。尝试使用 PIG 和 python UDF 重新创建它。

在下面的示例中,我需要每个名称的行号从 1 重新开始,并为新的月份(新记录)递增。另外,我需要每个名字的余额与上个月的差额。

输入数据

name    month   balance
A   1   10
A   2   5
A   3   15
B   2   20
B   3   10
B   4   45
B   5   50

输出数据

name    month   balance row_number  balance_diff
A   1   10  1   0
A   2   5   1   -5
A   3   15  3   10
B   2   20  1   0
B   3   10  2   -10
B   4   45  3   35
B   5   50  4   5

如何使用 PIG 和 python UDF 做到这一点?以下是我尝试过的。

output = foreach (group input by (name)) {
    sorted = order input BY month asc;
    row_details= myudf.rownum_and_diff(sorted.(month, balance));
    generate flatten (sorted), flatten (row_details));
    };

Python UDF

def row_num(mth):
    return [x+1 for x,y in enumerate (mth)]

def diff(bal, n=1):
    return [x-y if (x is not None and y is not None) else 0.0 \
        for x,y in zip(bal, [:n] + bal)]

@outputSchema('udfbag:bag{udftuple:tuple(row_number: int, balance_diff: int)}')

def row_metrics(mthbal):
    mth, bal = zip(*mthbal)
    row_number = row_num(mth)
    balance_diff = diff(bal)
    return zip(row_number, balance_diff)

我的 python 函数有效。但是,将结果带入 PIG 后,我无法合并两个包(排序和 row_detail)。非常感谢任何帮助。

我还看到 PIG 中的 enumerate 函数对行号进行了我想要的操作。然而,作为学习 PIG 的一部分,我正在寻找使用 python UDF 的解决方案。

【问题讨论】:

  • 这段代码似乎有错误。 python代码中[:n]有错误,rownum_and_diff应该是row_metrics
  • 嗨@Dhanseh,感谢您发现myudf 上的错误。 row_num_and_diff - 忘记重命名它。我的 diff 函数的原始代码工作正常。即使我必须根据两三个月前的余额计算余额差,我也在尝试使用相同的功能。无论 n 的值如何(这是我想要的),我的原始函数都会返回相同数量的元素。但是,我注意到当 n>1 时,您的代码返回的元素较少。再次感谢您的帮助。

标签: python apache-pig window-functions udf enumerate


【解决方案1】:

试试这个。

Python UDF:

def row_num(mth):
    return [x+1 for x,y in enumerate (mth)]

def diff(bal, n=1):
    return [0]+[x-y for x,y in zip(bal[n:],bal[:-n])]


@outputSchema('udfbag:bag{udftuple:tuple(name: chararray, mth: int, row_number: int, balance_diff: int)}')

def row_metrics(mthbal):
    name, mth, bal = zip(*mthbal)
    row_number = row_num(mth)
    balance_diff = diff(bal)
    return zip(name,mth,row_number, balance_diff)

猪脚本:

register 'myudf.py' using jython as myudf;
inpdat = load 'input.dat' using PigStorage(',') as (name:chararray, month:int, balance:int);

outdat = foreach (group inpdat by name) {
    sorted = order inpdat BY month asc;
    row_details = myudf.row_metrics(sorted);
    generate flatten (row_details);
    };

dump outdat;

【讨论】:

  • 感谢您发布解决方案。虽然这可行,但我试图避免这种解决方案。原因是,我的实际关系中有很多字段。当您只使用 2 个字段进行计算以使用 python UDF 创建更多计算字段时,将整个字段发送到 python 并将它们全部带回 pig 和一些额外的计算字段是没有意义的。我终于找到了解决方案(发布在下面),但是,由于我是 PIG 初学者,很想知道是否有其他方法可以解决这个问题。
【解决方案2】:

在我的情况下,使用 piggybank 的缝合功能有效。有兴趣了解任何其他方法来做到这一点。

REGISTER /mypath/piggybank.jar;
define Stitch org.apache.pig.piggybank.evaluation.Stitch;

input = load 'input.dat' using PigStorage(',') as (name:chararray, month:int, balance:int);

output = FOREACH (group input by name) { 
sorted = ORDER input by month asc; 
udf_fields = myudf.row_metrics(sorted.(month, balance));
generate flatten(Stitch(sorted,udf_fields)) as (name, month, balance, row_number, balance_diff);
};

【讨论】:

    猜你喜欢
    • 2019-04-07
    • 2013-02-02
    • 2023-04-09
    • 1970-01-01
    • 2021-03-18
    • 2019-02-17
    • 1970-01-01
    • 1970-01-01
    • 2012-11-08
    相关资源
    最近更新 更多