【问题标题】:Pyspark - Cumulative sum with reset conditionPyspark - 具有重置条件的累积和
【发布时间】:2019-10-16 11:39:55
【问题描述】:

我有这个数据框

+---+----+---+
|  A|   B|  C|
+---+----+---+
|  0|null|  1|
|  1| 3.0|  0|
|  2| 7.0|  0|
|  3|null|  1|
|  4| 4.0|  0|
|  5| 3.0|  0|
|  6|null|  1|
|  7|null|  1|
|  8|null|  1|
|  9| 5.0|  0|
| 10| 2.0|  0|
| 11|null|  1|
+---+----+---+

我需要做的是 C 列中值的累积总和,直到下一个值为零。

预期输出:

+---+----+---+----+
|  A|   B|  C|   D|
+---+----+---+----+
|  0|null|  1|   1|
|  1| 3.0|  0|   0|
|  2| 7.0|  0|   0|
|  3|null|  1|   1|
|  4| 4.0|  0|   0|
|  5| 3.0|  0|   0|
|  6|null|  1|   1|
|  7|null|  1|   2|
|  8|null|  1|   3|
|  9| 5.0|  0|   0|
| 10| 2.0|  0|   0|
| 11|null|  1|   1|
+---+----+---+----+

重现数据框:

from pyspark.shell import sc
from pyspark.sql import Window
from pyspark.sql.functions import lag, when, sum

x = sc.parallelize([
    [0, None], [1, 3.], [2, 7.], [3, None], [4, 4.],
    [5, 3.], [6, None], [7, None], [8, None], [9, 5.], [10, 2.], [11, None]])
x = x.toDF(['A', 'B'])

# Transform null values into "1"
x = x.withColumn('C', when(x.B.isNull(), 1).otherwise(0))

【问题讨论】:

    标签: python dataframe apache-spark pyspark cumulative-sum


    【解决方案1】:

    创建一个临时列 (grp),每次列 C 等于 0(重置条件)时递增一个计数器,并将其用作累积总和的分区列。

    import pyspark.sql.functions as f
    from pyspark.sql import Window
    
    x.withColumn(
        "grp", 
        f.sum((f.col("C") == 0).cast("int")).over(Window.orderBy("A"))
    ).withColumn(
        "D",
        f.sum(f.col("C")).over(Window.partitionBy("grp").orderBy("A"))
    ).drop("grp").show()
    #+---+----+---+---+
    #|  A|   B|  C|  D|
    #+---+----+---+---+
    #|  0|null|  1|  1|
    #|  1| 3.0|  0|  0|
    #|  2| 7.0|  0|  0|
    #|  3|null|  1|  1|
    #|  4| 4.0|  0|  0|
    #|  5| 3.0|  0|  0|
    #|  6|null|  1|  1|
    #|  7|null|  1|  2|
    #|  8|null|  1|  3|
    #|  9| 5.0|  0|  0|
    #| 10| 2.0|  0|  0|
    #| 11|null|  1|  1|
    #+---+----+---+---+
    

    【讨论】:

    • 你能不能请'grp'部分。这很有趣,但我无法理解它是如何工作的。
    • 哇:(f.col("C") == 0).cast("int") - 创建一个布尔值,然后将其转换为 1,以便可以将其汇总到一个分区中。这对于某种性能实际上是必要的,还是只是“聪明”?
    • @stephen 我不记得说实话,但我认为这是必要的,因为sum 需要数字类型并且不会进行隐式转换。也许最新版本的 spark 处理它的方式不同。如果您尝试一下,请随时对此答案进行澄清更新。
    • @pault 最佳答案。救了我的命 +1
    猜你喜欢
    • 2018-05-02
    • 1970-01-01
    • 2020-11-17
    • 1970-01-01
    • 2018-08-09
    • 1970-01-01
    • 1970-01-01
    • 2021-10-02
    • 1970-01-01
    相关资源
    最近更新 更多