【发布时间】:2016-03-21 16:00:43
【问题描述】:
我有一个存储在 parquet 文件中的每个 LoanId 的贷款支付历史记录,并尝试计算每个贷款每个期间的“逾期”金额。 如果不是计算到期金额的棘手性质,这将是一个简单的窗口分区任务。
如果客户付款少于到期金额,则逾期金额会增加,另一方面,如果客户预付款,则在后续期间忽略额外付款(下例中的第 5 行和第 6 行)。
LoanID Period DueAmt ActualPmt PastDue
1 1 100 100 0
1 2 100 60 -40
1 3 100 100 -40
1 4 100 200 0 <== This advance payment is not rolled to next period
1 5 100 110 0 <== This advance payment is not rolled to next period
1 6 100 80 -20
1 7 100 60 -60
1 8 100 100 -60
2 1 150 150 0
2 2 150 150 0
2 3 150 150 0
3 1 200 200 0
3 2 200 120 -80
3 3 200 120 -160
为了解决这个问题,我实际上需要为每个按期间排序的分区(LoanID)应用自定义函数。
spark 中有哪些可用选项。
简单但复杂似乎使用DF-> RDD-> groupby,将lambda转换回数据帧。
更优雅的是自定义 UDAF(在 scala 中?)带有 window 功能,但找不到单个实现示例。
好的,所以我尝试了第一个解决方案,从 Dataframe 到 Pair RDD 并返回
from pyspark.sql import Row
def dueAmt(partition):
'''
@type partition:list
'''
#first sort rows
sp=sorted(partition, key=lambda r: r.Period )
res=[]
due=0
for r in sp:
due+=r.ActualPmt-r.DueAmt
if due>0: due=0;
#row is immutable so we need to create new row with updated value
d=r.asDict()
d['CalcDueAmt']=-due
newRow=Row(**d)
res.append(newRow)
return res
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('PmtDueSample2.csv').cache()
rd1=df.rdd.map(lambda r: (r.LoanID, r ) )
rd2=rd1.groupByKey()
rd3=rd2.mapValues(dueAmt)
rd4=rd3.flatMap(lambda t: t[1] )
df2=rd4.toDF()
似乎有效。
在这段旅程中,我实际上发现了 pyspark 实现中的几个错误。
- 在 Row 类中实现____call____ 是错误的。
- 该行的构造函数中的烦人错误。没有明显的原因____new____排序 列,所以在旅程结束时,我的结果表有列 按字母顺序排列。这只会让人更难看 最终结果。
【问题讨论】:
标签: python scala apache-spark dataframe apache-spark-sql