【问题标题】:aggregate function Count usage with groupBy in Spark在 Spark 中使用 groupBy 聚合函数计数使用情况
【发布时间】:2017-06-12 22:18:01
【问题描述】:

我试图在 pySpark 的一行代码中进行多项操作, 并且不确定这是否适用于我的情况。

我的目的是不必将输出保存为新的数据框。

我目前的代码比较简单:

encodeUDF = udf(encode_time, StringType())
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME')))
  .groupBy('timePeriod')
  .agg(
    mean('DOWNSTREAM_SIZE').alias("Mean"),
    stddev('DOWNSTREAM_SIZE').alias("Stddev")
  )
  .show(20, False)

我的意图是在使用groupBy 之后添加count(),以获取匹配timePeriod 列的每个值的记录数,打印\显示为输出。

尝试使用 groupBy(..).count().agg(..) 时出现异常。

有什么方法可以同时实现count()agg().show() 打印,而无需将代码拆分为两行命令,例如:

new_log_df.withColumn(..).groupBy(..).count()
new_log_df.withColumn(..).groupBy(..).agg(..).show()

或者更好的是,将合并的输出合并到agg.show() 输出 - 一个额外的列,说明与行的值匹配的记录计数。例如:

timePeriod | Mean | Stddev | Num Of Records
    X      | 10   |   20   |    315

【问题讨论】:

    标签: java scala apache-spark pyspark apache-spark-sql


    【解决方案1】:

    count() 可以在agg() 中使用,因为groupBy 表达式相同。

    使用 Python

    import pyspark.sql.functions as func
    
    new_log_df.cache().withColumn("timePeriod", encodeUDF(new_log_df["START_TIME"])) 
      .groupBy("timePeriod")
      .agg(
         func.mean("DOWNSTREAM_SIZE").alias("Mean"), 
         func.stddev("DOWNSTREAM_SIZE").alias("Stddev"),
         func.count(func.lit(1)).alias("Num Of Records")
       )
      .show(20, False)
    

    pySpark SQL functions doc

    使用 Scala

    import org.apache.spark.sql.functions._ //for count()
    
    new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
      .groupBy("timePeriod")
      .agg(
         mean("DOWNSTREAM_SIZE").alias("Mean"), 
         stddev("DOWNSTREAM_SIZE").alias("Stddev"),
         count(lit(1)).alias("Num Of Records")
       )
      .show(20, false)
    

    count(1) 将按等于count("timePeriod") 的第一列计算记录

    使用 Java

    import static org.apache.spark.sql.functions.*;
    
    new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
      .groupBy("timePeriod")
      .agg(
         mean("DOWNSTREAM_SIZE").alias("Mean"), 
         stddev("DOWNSTREAM_SIZE").alias("Stddev"),
         count(lit(1)).alias("Num Of Records")
       )
      .show(20, false)
    

    【讨论】:

    • 数据框中有什么方法可以按分区计算所有记录并将其汇总到最终计数,如果有的话如何?
    • 你的意思是类似于reduceBy
    • 一个小的语法评论:我是 Python 中 dict 语法的忠实粉丝,例如.agg({"X: "sum", "Y": "sum", "Z": "sum", "blah": "count"}),这对 .withColumn("blah", lit(1)) 非常有效 - 可能有更好的方法,但我还没有找到它(还没有!)。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-01-12
    • 2021-04-25
    • 1970-01-01
    • 1970-01-01
    • 2013-01-11
    • 2018-09-03
    相关资源
    最近更新 更多