【问题标题】:Spark Dataframe filldownSpark 数据框填充
【发布时间】:2021-07-07 23:23:48
【问题描述】:

我想对数据框执行“填充”类型操作,以删除空值并确保最后一行是一种汇总行,其中包含基于 timestamp 的每列的最后一个已知值,由itemId 分组。当我使用 Azure Synapse Notebooks 时,语言可以是 Scala、Pyspark、SparkSQL 甚至 c#。然而这里的问题是,真正的解决方案有数百万行和数百列,所以我需要一个可以利用 Spark 的动态解决方案。我们可以配置一个大集群,以确保我们充分利用它?

样本数据:

// Assign sample data to dataframe
val df = Seq(
    ( 1, "10/01/2021", 1, "abc", null ),
    ( 2, "11/01/2021", 1, null, "bbb" ),
    ( 3, "12/01/2021", 1, "ccc", null ),
    ( 4, "13/01/2021", 1, null, "ddd" ),

    ( 5, "10/01/2021", 2, "eee", "fff" ),
    ( 6, "11/01/2021", 2, null, null ),
    ( 7, "12/01/2021", 2, null, null )
    ).
    toDF("eventId", "timestamp", "itemId", "attrib1", "attrib2")

df.show

第 4 行和第 7 行作为摘要行的预期结果:

+-------+----------+------+-------+-------+
|eventId| timestamp|itemId|attrib1|attrib2|
+-------+----------+------+-------+-------+
|      1|10/01/2021|     1|    abc|   null|
|      2|11/01/2021|     1|    abc|    bbb|
|      3|12/01/2021|     1|    ccc|    bbb|
|      4|13/01/2021|     1|    ccc|    ddd|
|      5|10/01/2021|     2|    eee|    fff|
|      6|11/01/2021|     2|    eee|    fff|
|      7|12/01/2021|     2|    eee|    fff|
+-------+----------+------+-------+-------+

我已查看此选项,但无法根据我的用例对其进行调整。

Spark / Scala: forward fill with last observation

我有一种可行的 SparkSQL 解决方案,但对于大量列来说它会非常冗长,希望能更容易维护:

%%sql
WITH cte (
SELECT
    eventId,
    itemId,
    ROW_NUMBER() OVER( PARTITION BY itemId ORDER BY timestamp ) AS rn,
    attrib1,
    attrib2
FROM df
)
SELECT
    eventId,
    itemId,
    CASE rn WHEN 1 THEN attrib1 
        ELSE COALESCE( attrib1, LAST_VALUE(attrib1, true) OVER( PARTITION BY itemId ) ) 
    END AS attrib1_xlast,
    CASE rn WHEN 1 THEN attrib2 
        ELSE COALESCE( attrib2, LAST_VALUE(attrib2, true) OVER( PARTITION BY itemId ) ) 
    END AS attrib2_xlast
    
FROM cte
ORDER BY eventId

【问题讨论】:

    标签: scala apache-spark pyspark azure-synapse


    【解决方案1】:

    对于许多columns,您可以创建一个expression,如下所示

    val window = Window.partitionBy($"itemId").orderBy($"timestamp")
    
    // Instead of selecting columns you could create a list of columns 
    val expr = df.columns
      .map(c => coalesce(col(c), last(col(c), true).over(window)).as(c))
    
    df.select(expr: _*).show(false)
    

    更新:

    val mainColumns = df.columns.filterNot(_.startsWith("attrib"))
    val aggColumns = df.columns.diff(mainColumns).map(c => coalesce(col(c), last(col(c), true).over(window)).as(c))
    
    df.select(( mainColumns.map(col) ++ aggColumns): _*).show(false)
    

    结果:

    +-------+----------+------+-------+-------+
    |eventId|timestamp |itemId|attrib1|attrib2|
    +-------+----------+------+-------+-------+
    |1      |10/01/2021|1     |abc    |null   |
    |2      |11/01/2021|1     |abc    |bbb    |
    |3      |12/01/2021|1     |ccc    |bbb    |
    |4      |13/01/2021|1     |ccc    |ddd    |
    |5      |10/01/2021|2     |eee    |fff    |
    |6      |11/01/2021|2     |eee    |fff    |
    |7      |12/01/2021|2     |eee    |fff    |
    +-------+----------+------+-------+-------+
    

    【讨论】:

    • 有没有办法排除某些列?
    • 您可以将drop 列作为df.columns.drop(0) 使用索引,也可以使用filterNot df.columns.filterNot(_.endsWith("eventId"))
    • 啊,好吧,我的意思是我只想在某些列上运行填充,并创建一个结果数据框,其中一些列没有被触及,有些列已经应用了填充,有意义吗?如果您查看我的 SQL 示例,您可以看到 eventId、timestamp 和 itemId 未更改,并且仅更改了 attrib1 和 2。因此,诸如“仅填写列 [attrib1, attrib2]”或“仅填写以 attrib 开头的列”之类的内容。有意义吗?
    • 请检查更新,您也可以自定义字段列表,而不是这样准备。
    • 感谢您的更新。如果您认为这是一个有趣的问题,请不要害怕投票!
    猜你喜欢
    • 2018-01-16
    • 1970-01-01
    • 2020-10-20
    • 2019-08-09
    • 2021-03-11
    • 2021-03-17
    • 1970-01-01
    • 2022-06-13
    • 1970-01-01
    相关资源
    最近更新 更多