【问题标题】:pyspark: count distinct over a windowpyspark:在窗口上计数不同
【发布时间】:2018-02-02 18:47:36
【问题描述】:

我刚刚尝试在窗口上执行countDistinct 并收到此错误:

AnalysisException: u'Distinct 窗口函数不受支持: 计数(不同颜色#1926)

有没有办法对 pyspark 中的窗口进行不同的计数?

下面是一些示例代码:

from pyspark.sql.window import Window    
from pyspark.sql import functions as F

#function to calculate number of seconds from number of days
days = lambda i: i * 86400

df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),
                    (13, "2017-03-15T12:27:18+00:00", "red"),
                    (25, "2017-03-18T11:27:18+00:00", "red")],
                    ["dollars", "timestampGMT", "color"])
                    
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))

df = df.withColumn('distinct_color_count_over_the_last_week', F.countDistinct("color").over(w))

df.show()

这是我希望看到的输出:

+-------+--------------------+------+---------------------------------------+
|dollars|        timestampGMT| color|distinct_color_count_over_the_last_week|
+-------+--------------------+------+---------------------------------------+
|     17|2017-03-10 15:27:...|orange|                                      1|
|     13|2017-03-15 12:27:...|   red|                                      2|
|     25|2017-03-18 11:27:...|   red|                                      1|
+-------+--------------------+------+---------------------------------------+

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql window-functions distinct-values


    【解决方案1】:

    编辑:

    正如 noleto 在下面的回答中提到的,现在有一个 approx_count_distinct 函数,因为 pyspark 2.1 可以在窗口上工作。


    原答案

    我发现我可以结合使用 collect_set 和 size 函数来模拟窗口上的 countDistinct 功能:

    from pyspark.sql.window import Window
    from pyspark.sql import functions as F
    
    #function to calculate number of seconds from number of days
    days = lambda i: i * 86400
    
    #create some test data
    df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),
                        (13, "2017-03-15T12:27:18+00:00", "red"),
                        (25, "2017-03-18T11:27:18+00:00", "red")],
                        ["dollars", "timestampGMT", "color"])
    
    #convert string timestamp to timestamp type             
    df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
    
    #create window by casting timestamp to long (number of seconds)
    w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))
    
    #use collect_set and size functions to perform countDistinct over a window
    df = df.withColumn('distinct_color_count_over_the_last_week', F.size(F.collect_set("color").over(w)))
    
    df.show()
    

    这会导致前一周记录的颜色计数不同:

    +-------+--------------------+------+---------------------------------------+
    |dollars|        timestampGMT| color|distinct_color_count_over_the_last_week|
    +-------+--------------------+------+---------------------------------------+
    |     17|2017-03-10 15:27:...|orange|                                      1|
    |     13|2017-03-15 12:27:...|   red|                                      2|
    |     25|2017-03-18 11:27:...|   red|                                      1|
    +-------+--------------------+------+---------------------------------------+
    

    【讨论】:

    • 如果您的countDistinct 位于多个列之间怎么办? collect_set 只能采用单个列名。
    • 这有点麻烦,但我所做的一件事就是创建一个新列,它是两列的串联。就像您有一个名字列和一个姓氏列一样,添加第三列,即加在一起的两列。然后,您可以使用该新列进行收集集。
    • 有趣。我一直在使用的解决方法是在聚合中使用countDistinct 执行groupBy,然后将join 返回到分组的原始DataFrame。我想知道哪种方法对大型集群更有效?
    • 我认为添加新列会使用更多 RAM,特别是如果您要处理很多列,或者列很大,但不会增加太多计算复杂性。
    • 我注意到使用 orderBy 时出现性能问题,它将所有结果返回给驱动程序。
    【解决方案2】:

    @Bob Swain 的回答很好而且有效!从那时起,Spark version 2.1,Spark 提供了与countDistinct 等效的函数approx_count_distinct,它的使用效率更高,最重要的是,它支持对窗口进行计数。

    下面是替换代码:

    #approx_count_distinct supports a window
    df = df.withColumn('distinct_color_count_over_the_last_week', F.approx_count_distinct("color").over(w))
    

    对于基数较小的列,结果应该与“countDistinct”相同。当数据集增长很多时,您应该考虑调整参数rsd - 允许的最大估计误差,这可以让您调整权衡精度/性能。

    【讨论】:

    • result 应该与 "countDistinct" 相同 - 对此有什么保证吗?如果我使用默认的 rsd = 0.05,这是否意味着对于基数
    猜你喜欢
    • 2020-10-10
    • 1970-01-01
    • 2016-06-30
    • 2020-02-09
    • 2018-08-27
    • 1970-01-01
    • 1970-01-01
    • 2016-10-23
    • 2018-07-11
    相关资源
    最近更新 更多