【问题标题】:How to compute the sum of orders over a 12 months period sliding by 1 month per customer in Spark如何在 Spark 中计算每个客户在 12 个月内滑动 1 个月的订单总和
【发布时间】:2018-05-11 21:34:11
【问题描述】:

我对 Scala 比较陌生。目前,我正在尝试在 12 个月内以每月滑动的方式汇总 Spark 中的订单数据。

以下是我的数据的一个简单示例,我尝试对其进行格式化以便您轻松测试它

import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._


var sample = Seq(("C1","01/01/2016", 20), ("C1","02/01/2016", 5), 
 ("C1","03/01/2016", 2),  ("C1","04/01/2016", 3), ("C1","05/01/2017", 5),
 ("C1","08/01/2017", 5), ("C1","01/02/2017", 10), ("C1","01/02/2017", 10),  
 ("C1","01/03/2017", 10)).toDF("id","order_date", "orders")

sample = sample.withColumn("order_date",
to_date(unix_timestamp($"order_date", "dd/MM/yyyy").cast("timestamp")))

sample.show 
 +---+----------+------+
 | id|order_date|orders|
 +---+----------+------+
 | C1|2016-01-01|    20|
 | C1|2016-01-02|     5|
 | C1|2016-01-03|     2|
 | C1|2016-01-04|     3|
 | C1|2017-01-05|     5|
 | C1|2017-01-08|     5|
 | C1|2017-02-01|    10|
 | C1|2017-02-01|    10|
 | C1|2017-03-01|    10|
 +---+----------+------+

强加给我的结果如下。

id      period_start    period_end  rolling
C1      2015-01-01      2016-01-01  30
C1      2016-01-01      2017-01-01  40
C1      2016-02-01      2017-02-01  30
C1      2016-03-01      2017-03-01  40

到目前为止我试图做的事情

我将每个客户的日期折叠到每月的第一天

(即 2016-01-[1..31] >> 2016-01-01)

import org.joda.time._

val collapse_month = (month:Integer, year:Integer ) => {
   var  dt = new DateTime().withYear(year)
                        .withMonthOfYear(month)
                        .withDayOfMonth(1)
   dt.toString("yyyy-MM-dd")
 }

val collapse_month_udf = udf(collapse_month)


sample = sample.withColumn("period_end",
           collapse_month_udf(
           month(col("order_date")),
           year(col("order_date"))
           ).as("date"))

sample.groupBy($"id",  $"period_end")
              .agg(sum($"orders").as("orders"))
              .orderBy("period_end").show
 +---+----------+------+
 | id|period_end|orders|
 +---+----------+------+
 | C1|2016-01-01|    30|
 | C1|2017-01-01|    10|
 | C1|2017-02-01|    20|
 | C1|2017-03-01|    10|
 +---+----------+------+

我尝试了提供的window 功能,但我无法通过一个选项使用 12 个月的滑动。

我真的不确定从这一点开始最好的方法是什么,考虑到我必须处理的数据量,这不会花费 5 个小时。

任何帮助将不胜感激。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql aggregation


    【解决方案1】:

    尝试了提供的窗口功能,但我无法通过一个选项使用 12 个月的滑动。

    您仍然可以使用window 以更长的间隔,但所有参数都必须以天或周表示:

    window($"order_date", "365 days", "28 days")
    

    不幸的是 window 这不会考虑月份或年份的界限,所以它对你没有那么有用。

    我个人会先汇总数据:

    val byMonth = sample
      .groupBy($"id", trunc($"order_date", "month").alias("order_month"))
      .agg(sum($"orders").alias("orders"))
    
    +---+-----------+-----------+                                                   
    | id|order_month|sum(orders)|
    +---+-----------+-----------+
    | C1| 2017-01-01|         10|
    | C1| 2016-01-01|         30|
    | C1| 2017-02-01|         20|
    | C1| 2017-03-01|         10|
    +---+-----------+-----------+
    

    创建参考日期范围:

    import java.time.temporal.ChronoUnit
    
    val Row(start: java.sql.Date, end: java.sql.Date) = byMonth
      .select(min($"order_month"), max($"order_month"))
      .first
    
    val months = (0L to ChronoUnit.MONTHS.between(
        start.toLocalDate, end.toLocalDate))
      .map(i => java.sql.Date.valueOf(start.toLocalDate.plusMonths(i)))
      .toDF("order_month")
    

    并结合唯一的 ID:

    val ref = byMonth.select($"id").distinct.crossJoin(months)
    

    并与源返回:

    val expanded = ref.join(byMonth, Seq("id", "order_month"), "leftouter")
    
    +---+-----------+------+ 
    | id|order_month|orders|
    +---+-----------+------+
    | C1| 2016-01-01|    30|
    | C1| 2016-02-01|  null|
    | C1| 2016-03-01|  null|
    | C1| 2016-04-01|  null|
    | C1| 2016-05-01|  null|
    | C1| 2016-06-01|  null|
    | C1| 2016-07-01|  null|
    | C1| 2016-08-01|  null|
    | C1| 2016-09-01|  null|
    | C1| 2016-10-01|  null|
    | C1| 2016-11-01|  null|
    | C1| 2016-12-01|  null|
    | C1| 2017-01-01|    10|
    | C1| 2017-02-01|    20|
    | C1| 2017-03-01|    10|
    +---+-----------+------+
    

    通过这样准备的数据,您可以使用窗口函数:

    import org.apache.spark.sql.expressions.Window
    
    val w = Window.partitionBy($"id")
         .orderBy($"order_month")
        .rowsBetween(-12, Window.currentRow)
    
    expanded.withColumn("rolling", sum("orders").over(w))
      .na.drop(Seq("orders"))
      .select(
          $"order_month" - expr("INTERVAL 12 MONTHS") as "period_start",
          $"order_month" as "period_end",
          $"rolling")
    
    +------------+----------+-------+
    |period_start|period_end|rolling|
    +------------+----------+-------+
    |  2015-01-01|2016-01-01|     30|
    |  2016-01-01|2017-01-01|     40|
    |  2016-02-01|2017-02-01|     30|
    |  2016-03-01|2017-03-01|     40|
    +------------+----------+-------+
    

    请注意,这是一项非常昂贵的操作,至少需要两次随机播放:

    == Physical Plan ==
    *Project [cast(cast(order_month#104 as timestamp) - interval 1 years as date) AS period_start#1387, order_month#104 AS period_end#1388, rolling#1375L]
    +- *Filter AtLeastNNulls(n, orders#55L)
       +- Window [sum(orders#55L) windowspecdefinition(id#7, order_month#104 ASC NULLS FIRST, ROWS BETWEEN 12 PRECEDING AND CURRENT ROW) AS rolling#1375L], [id#7], [order_month#104 ASC NULLS FIRST]
          +- *Sort [id#7 ASC NULLS FIRST, order_month#104 ASC NULLS FIRST], false, 0
             +- Exchange hashpartitioning(id#7, 200)
                +- *Project [id#7, order_month#104, orders#55L]
                   +- *BroadcastHashJoin [id#7, order_month#104], [id#181, order_month#49], LeftOuter, BuildRight
                      :- BroadcastNestedLoopJoin BuildRight, Cross
                      :  :- *HashAggregate(keys=[id#7], functions=[])
                      :  :  +- Exchange hashpartitioning(id#7, 200)
                      :  :     +- *HashAggregate(keys=[id#7], functions=[])
                      :  :        +- *HashAggregate(keys=[id#7, trunc(order_date#14, month)#1394], functions=[])
                      :  :           +- Exchange hashpartitioning(id#7, trunc(order_date#14, month)#1394, 200)
                      :  :              +- *HashAggregate(keys=[id#7, trunc(order_date#14, month) AS trunc(order_date#14, month)#1394], functions=[])
                      :  :                 +- LocalTableScan [id#7, order_date#14]
                      :  +- BroadcastExchange IdentityBroadcastMode
                      :     +- LocalTableScan [order_month#104]
                      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, date, true]))
                         +- *HashAggregate(keys=[id#181, trunc(order_date#14, month)#1395], functions=[sum(cast(orders#183 as bigint))])
                            +- Exchange hashpartitioning(id#181, trunc(order_date#14, month)#1395, 200)
                               +- *HashAggregate(keys=[id#181, trunc(order_date#14, month) AS trunc(order_date#14, month)#1395], functions=[partial_sum(cast(orders#183 as bigint))])
                                  +- LocalTableScan [id#181, order_date#14, orders#183]
    

    也可以使用rangeBetween帧来表达,但是你必须先对数据进行编码:

    val encoded = byMonth
      .withColumn("order_month_offset",
          // Choose "zero" date appropriate in your scenario
          months_between($"order_month", to_date(lit("1970-01-01"))))
    
    
    val w = Window.partitionBy($"id")
      .orderBy($"order_month_offset")
      .rangeBetween(-12, Window.currentRow)
    
    encoded.withColumn("rolling", sum($"orders").over(w))
    
    +---+-----------+------+------------------+-------+                             
    | id|order_month|orders|order_month_offset|rolling|
    +---+-----------+------+------------------+-------+
    | C1| 2016-01-01|    30|             552.0|     30|
    | C1| 2017-01-01|    10|             564.0|     40|
    | C1| 2017-02-01|    20|             565.0|     30|
    | C1| 2017-03-01|    10|             566.0|     40|
    +---+-----------+------+------------------+-------+
    

    这将使带有引用的联接过时并简化执行计划。

    【讨论】:

      猜你喜欢
      • 2020-09-13
      • 1970-01-01
      • 2019-01-04
      • 1970-01-01
      • 2021-08-14
      • 1970-01-01
      • 2022-08-02
      • 1970-01-01
      • 2017-06-05
      相关资源
      最近更新 更多