【发布时间】:2017-06-23 00:29:48
【问题描述】:
我正在尝试按组计算记录之间的差异,并按组包括行号。这可以通过使用窗口函数的 HIVE 中的滞后和行号函数来完成。尝试使用 PIG 和 python UDF 重新创建它。
在下面的示例中,我需要每个名称的行号从 1 重新开始,并为新的月份(新记录)递增。另外,我需要每个名字的余额与上个月的差额。
输入数据
name month balance
A 1 10
A 2 5
A 3 15
B 2 20
B 3 10
B 4 45
B 5 50
输出数据
name month balance row_number balance_diff
A 1 10 1 0
A 2 5 1 -5
A 3 15 3 10
B 2 20 1 0
B 3 10 2 -10
B 4 45 3 35
B 5 50 4 5
如何使用 PIG 和 python UDF 做到这一点?以下是我尝试过的。
猪
output = foreach (group input by (name)) {
sorted = order input BY month asc;
row_details= myudf.rownum_and_diff(sorted.(month, balance));
generate flatten (sorted), flatten (row_details));
};
Python UDF
def row_num(mth):
return [x+1 for x,y in enumerate (mth)]
def diff(bal, n=1):
return [x-y if (x is not None and y is not None) else 0.0 \
for x,y in zip(bal, [:n] + bal)]
@outputSchema('udfbag:bag{udftuple:tuple(row_number: int, balance_diff: int)}')
def row_metrics(mthbal):
mth, bal = zip(*mthbal)
row_number = row_num(mth)
balance_diff = diff(bal)
return zip(row_number, balance_diff)
我的 python 函数有效。但是,将结果带入 PIG 后,我无法合并两个包(排序和 row_detail)。非常感谢任何帮助。
我还看到 PIG 中的 enumerate 函数对行号进行了我想要的操作。然而,作为学习 PIG 的一部分,我正在寻找使用 python UDF 的解决方案。
【问题讨论】:
-
这段代码似乎有错误。 python代码中
[:n]有错误,rownum_and_diff应该是row_metrics -
嗨@Dhanseh,感谢您发现myudf 上的错误。 row_num_and_diff - 忘记重命名它。我的 diff 函数的原始代码工作正常。即使我必须根据两三个月前的余额计算余额差,我也在尝试使用相同的功能。无论 n 的值如何(这是我想要的),我的原始函数都会返回相同数量的元素。但是,我注意到当 n>1 时,您的代码返回的元素较少。再次感谢您的帮助。
标签: python apache-pig window-functions udf enumerate