【问题标题】:Spark Scala Window extend result until the endSpark Scala Window 扩展结果直到结束
【发布时间】:2019-09-27 16:26:34
【问题描述】:

我将根据初始数据框和我想要实现的数据框揭示我的问题:

val df_997 = Seq [(Int, Int, Int, Int)]((1,1,7,10),(1,10,4,300),(1,3,14,50),(1,20,24,70),(1,30,12,90),(2,10,4,900),(2,25,30,40),(2,15,21,60),(2,5,10,80)).toDF("policyId","FECMVTO","aux","IND_DEF").orderBy(asc("policyId"), asc("FECMVTO"))
df_997.show
+--------+-------+---+-------+
|policyId|FECMVTO|aux|IND_DEF|
+--------+-------+---+-------+
|       1|      1|  7|     10|
|       1|      3| 14|     50|
|       1|     10|  4|    300|
|       1|     20| 24|     70|
|       1|     30| 12|     90|
|       2|      5| 10|     80|
|       2|     10|  4|    900|
|       2|     15| 21|     60|
|       2|     25| 30|     40|
+--------+-------+---+-------+

假设我已经通过列 policyId 对这个 DF 进行了分区,并基于它创建了列 row_num 以便更好地查看 Windows:

val win = Window.partitionBy("policyId").orderBy("FECMVTO")

val df_998 = df_997.withColumn("row_num",row_number().over(win))
df_998.show
+--------+-------+---+-------+-------+
|policyId|FECMVTO|aux|IND_DEF|row_num|
+--------+-------+---+-------+-------+
|       1|      1|  7|     10|      1|
|       1|      3| 14|     50|      2|
|       1|     10|  4|    300|      3|
|       1|     20| 24|     70|      4|
|       1|     30| 12|     90|      5|
|       2|      5| 10|     80|      1|
|       2|     10|  4|    900|      2|
|       2|     15| 21|     60|      3|
|       2|     25| 30|     40|      4|
+--------+-------+---+-------+-------+

现在,对于每个窗口,如果 aux 的值为 4,我想将该寄存器的 IND_DEF 列的值设置为该寄存器的 FEC_MVTO 列,直到窗口结束。

生成的 DF 将是:

+--------+-------+---+-------+-------+
|policyId|FECMVTO|aux|IND_DEF|row_num|
+--------+-------+---+-------+-------+
|       1|      1|  7|     10|      1|
|       1|      3| 14|     50|      2|
|       1|    300|  4|    300|      3|
|       1|    300| 24|     70|      4|
|       1|    300| 12|     90|      5|
|       2|      5| 10|     80|      1|
|       2|    900|  4|    900|      2|
|       2|    900| 21|     60|      3|
|       2|    900| 30|     40|      4|
+--------+-------+---+-------+-------+

感谢您的建议,因为我很困在这里......

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    这是一种方法:首先left-加入带有aux == 4过滤版本的DataFrame,然后应用窗口函数first以每个分区所需的IND_DEF值回填nulls,最后有条件地重新创建列FECMVTO

    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._
    import spark.implicits._
    
    val df = Seq(
      (1,1,7,10), (1,10,4,300), (1,3,14,50), (1,20,24,70), (1,30,12,90), 
      (2,10,4,900), (2,25,30,40), (2,15,21,60), (2,5,10,80)
    ).toDF("policyId","FECMVTO","aux","IND_DEF")
    
    val win = Window.partitionBy("policyId").orderBy("FECMVTO").
      rowsBetween(Window.unboundedPreceding, 0)
    
    val df2 = df.
      select($"policyId", $"aux", $"IND_DEF".as("IND_DEF2")).
      where($"aux" === 4)
    
    df.join(df2, Seq("policyId", "aux"), "left_outer").
      withColumn("IND_DEF3", first($"IND_DEF2", ignoreNulls=true).over(win)).
      withColumn("FECMVTO", coalesce($"IND_DEF3", $"FECMVTO")).
      show
    // +--------+---+-------+-------+--------+--------+
    // |policyId|aux|FECMVTO|IND_DEF|IND_DEF2|IND_DEF3|
    // +--------+---+-------+-------+--------+--------+
    // |       1|  7|      1|     10|    null|    null|
    // |       1| 14|      3|     50|    null|    null|
    // |       1|  4|    300|    300|     300|     300|
    // |       1| 24|    300|     70|    null|     300|
    // |       1| 12|    300|     90|    null|     300|
    // |       2| 10|      5|     80|    null|    null|
    // |       2|  4|    900|    900|     900|     900|
    // |       2| 21|    900|     60|    null|     900|
    // |       2| 30|    900|     40|    null|     900|
    // +--------+---+-------+-------+--------+--------+
    

    IND_DEF2IND_DEF3 列仅用于说明(当然可以删除)。

    【讨论】:

    • 这个解决方案是有效的,谢谢,但是在处理大量数据时效率很低:)
    • @Javier de la Iglesia,鉴于必须根据条件动态识别分区中的第一行以将列填充到末尾的特定要求,我怀疑任何可行的解决方案都不会在规模上非常经济。为了提高性能,您可能还需要研究resource allocationdata skew problem 等领域。
    【解决方案2】:
    #I believe below can be solution for your issue
    
    Considering input_df is your input dataframe
    
    //Step#1 - Filter rows with IND_DEF = 4 from input_df
    val only_FECMVTO_4_df1 = input_df.filter($"IND_DEF" === 4)
    //Step#2 - Filling FECMVTO value from  IND_DEF for the above result
    val only_FECMVTO_4_df2 = only_FECMVTO_4_df1.withColumn("FECMVTO_NEW",$"IND_DEF").drop($"FECMVTO").withColumnRenamed("FECMVTO",$"FECMVTO_NEW")
    //Step#3 - removing all the records from step#1 from input_df
    val input_df_without_FECMVTO_4 = input_df.except(only_FECMVTO_4_df1)
    //combining Step#2 output with output of Step#3
    val final_df = input_df_without_FECMVTO_4.union(only_FECMVTO_4_df2)
    

    【讨论】:

      猜你喜欢
      • 2021-01-05
      • 2017-09-30
      • 1970-01-01
      • 2015-10-03
      • 2020-08-24
      • 2018-04-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多