【问题标题】:How to compute "custom running total" in spark 1.5 dataframe如何在 Spark 1.5 数据框中计算“自定义运行总计”
【发布时间】:2016-03-21 16:00:43
【问题描述】:

我有一个存储在 parquet 文件中的每个 LoanId 的贷款支付历史记录,并尝试计算每个贷款每个期间的“逾期”金额。 如果不是计算到期金额的棘手性质,这将是一个简单的窗口分区任务。

如果客户付款少于到期金额,则逾期金额会增加,另一方面,如果客户预付款,则在后续期间忽略额外付款(下例中的第 5 行和第 6 行)。

LoanID  Period  DueAmt  ActualPmt   PastDue
1       1       100     100             0
1       2       100     60              -40
1       3       100     100             -40
1       4       100     200             0   <== This advance payment is not rolled to next period
1       5       100     110             0   <== This advance payment is not rolled to next period
1       6       100     80              -20
1       7       100     60              -60
1       8       100     100             -60
2       1       150     150             0
2       2       150     150             0
2       3       150     150             0
3       1       200     200             0
3       2       200     120             -80
3       3       200     120             -160

为了解决这个问题,我实际上需要为每个按期间排序的分区(LoanID)应用自定义函数。

spark 中有哪些可用选项。

简单但复杂似乎使用DF-> RDD-> groupby,将lambda转换回数据帧。

更优雅的是自定义 UDAF(在 scala 中?)带有 window 功能,但找不到单个实现示例。


好的,所以我尝试了第一个解决方案,从 Dataframe 到 Pair RDD 并返回

    from pyspark.sql import Row 
    def dueAmt(partition):
        '''
        @type partition:list 
        '''
        #first sort rows
        sp=sorted(partition, key=lambda r: r.Period )
        res=[]
        due=0
        for r in sp:
            due+=r.ActualPmt-r.DueAmt
            if due>0: due=0;
            #row is immutable so we need to create new row with updated value
            d=r.asDict()
            d['CalcDueAmt']=-due
            newRow=Row(**d)
            res.append(newRow)
        return res    

    df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('PmtDueSample2.csv').cache()
    rd1=df.rdd.map(lambda r: (r.LoanID, r ) )
    rd2=rd1.groupByKey()
    rd3=rd2.mapValues(dueAmt)
    rd4=rd3.flatMap(lambda t: t[1] )
    df2=rd4.toDF()

似乎有效。

在这段旅程中,我实际上发现了 pyspark 实现中的几个错误。

  1. 在 Row 类中实现____call____ 是错误的。
  2. 该行的构造函数中的烦人错误。没有明显的原因____new____排序 列,所以在旅程结束时,我的结果表有列 按字母顺序排列。这只会让人更难看 最终结果。

【问题讨论】:

    标签: python scala apache-spark dataframe apache-spark-sql


    【解决方案1】:

    既不漂亮也不高效,但应该给你一些可以使用的东西。让我们从创建和注册表开始:

    val df = sc.parallelize(Seq(
      (1, 1, 100, 100), (1, 2, 100, 60), (1, 3, 100, 100),
      (1, 4, 100, 200), (1, 5, 100, 110), (1, 6, 100, 80),
      (1, 7, 100, 60), (1, 8, 100, 100), (2, 1, 150, 150),
      (2, 2, 150, 150), (2, 3, 150, 150), (3, 1, 200, 200),
      (3, 2, 200, 120), (3, 3, 200, 120)
    )).toDF("LoanID", "Period", "DueAmt", "ActualPmt")
    
    df.registerTempTable("df")
    

    接下来让我们定义和注册一个 UDF:

    case class Record(period: Int, dueAmt: Int, actualPmt: Int, pastDue: Int)
    
    def runningPastDue(idxs: Seq[Int], dues: Seq[Int], pmts: Seq[Int]) = {
      def f(acc: List[(Int, Int, Int, Int)], x: (Int, (Int, Int))) = 
        (acc.head, x) match {
          case ((_, _, _, pastDue), (idx, (due, pmt))) => 
            (idx, due, pmt, (pmt - due + pastDue).min(0)) :: acc
        }
    
      idxs.zip(dues.zip(pmts))
        .toList
        .sortBy(_._1)
        .foldLeft(List((0, 0, 0, 0)))(f)
        .reverse
        .tail
        .map{ case (i, due, pmt, past) => Record(i, due, pmt, past) }
    }
    
    sqlContext.udf.register("runningPastDue", runningPastDue _)
    

    聚合并计算总和:

    val aggregated = sqlContext.sql("""
      SELECT LoanID, explode(pmts) pmts FROM (
        SELECT LoanId, 
               runningPastDue(
                 collect_list(Period), 
                 collect_list(DueAmt), 
                 collect_list(ActualPmt)
               ) pmts
        FROM df GROUP BY LoanID) tmp""")
    
    val flattenExprs = List("Period", "DueAmt", "ActualPmt", "PastDue")
      .zipWithIndex
      .map{case (c, i) => col(s"tmp._${i+1}").alias(c)}
    

    终于变平了:

    val result = aggregated.select($"LoanID" :: flattenExprs: _*)
    

    【讨论】:

    • 感谢您提供样品。我一直在寻找这种解决方案。我将把你的解决方案的真实数据的性能与 DF->RDD->DF 的性能进行比较。我会及时通知你
    • 不客气。不过我不是特别热情。 collect_list这里感觉不对。
    猜你喜欢
    • 2014-07-09
    • 1970-01-01
    • 1970-01-01
    • 2017-08-23
    • 2015-12-20
    • 2010-10-14
    • 1970-01-01
    • 2017-10-04
    相关资源
    最近更新 更多