【问题标题】:Pyspark group by and count data with conditionPyspark 按条件分组和计数数据
【发布时间】:2021-01-09 18:49:28
【问题描述】:

我想使用分组功能解决一些问题。 让我告诉你我的情况。 我的数据是这样的。

| columnA | columnB | columnC | columnD | columnE |
| ------- | ------- | ------- | ------- | ------- |
| PersonA | DataOne | 20210101|    1    |    2    |
| PersonA | DataOne | 20210102|    2    |    4    |
| PersonA | DataOne | 20210102|    3    |    4    |
| PersonA | DataTwo | 20201226|    2    |    4    |
| PersonA | DataTwo | 20201226|    7    |    1    |
| PersonA | DataTwo | 20201227|    3    |    2    |
| PersonB | DataOne | 20201225|    1    |    3    |
| PersonB | DataTwo | 20201225|    2    |    4    |
| PersonB | DataTwo | 20201226|    1    |    2    |

然后,我想做的事情是聚合 columnD, E 按列 A、B、C 分组,但仅使用 最大值(C 列)。

我在下面的代码中像这样完成了这项工作,但我一直在想这种方式更简单更快。

my_df = (The data above)
my_df_max = my_df.groupBy("columnA","columnB").agg(max("columnC").alias("columnC"))
result = my_df\
    .groupBy("columnA","columnB","columnC")\
    .agg(count("columnD").alias("columnD"),sum("columnE").alias("columnE"))\
    .alias("tempA")\
    .join(my_df_max.alias("tempB"), (col("tempA.columnA") == col("tempB.columnA")) & (col("tempA.columnB") == col("tempB.columnB")) & (col("tempA.columnC") == col("tempB.columnC")))\
    .select(col("tempA.columnA"),col("tempA.columnB"), col("tempA.columnC"), col("columnD"), col("columnE"))

我期望的结果如下所示。

|columnA|columnB|columnC |columnD|columnE|
|-------|-------|--------|-------|-------|
|PersonA|DataOne|20210102|   2   |   8   |
|PersonA|DataTwo|20201227|   1   |   2   |
|PersonB|DataOne|20201225|   1   |   3   |
|PersonB|DataTwo|20201226|   1   |   2   |

如果我碰巧知道实现这个工作的代码方式和SQL方式,我会很高兴的。

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql


    【解决方案1】:

    一个可能更简洁的选项是首先通过 C 列中的最大值过滤您的数据框,然后进行聚合,(假设您的 spark 数据框名为 sdf):

    import pyspark.sql.functions as f
    
    sdf.withColumn('rankC', f.expr('dense_rank() over (partition by columnA, columnB order by columnC desc)'))\
        .filter(f.col('rankC') == 1)\
        .groupBy('columnA', 'columnB', 'columnC')\
        .agg(f.count('columnD').alias('columnD'), f.sum('columnE').alias('columnE'))\
        .show()
    
    +-------+-------+--------+-------+-------+
    |columnA|columnB| columnC|columnD|columnE|
    +-------+-------+--------+-------+-------+
    |PersonB|DataOne|20201225|      1|      3|
    |PersonA|DataOne|20210102|      2|      8|
    |PersonB|DataTwo|20201226|      1|      2|
    |PersonA|DataTwo|20201227|      1|      2|
    +-------+-------+--------+-------+-------+
    

    【讨论】:

      【解决方案2】:

      Spark SQL 方法来做到这一点。您可以在适当的窗口上使用rank() 过滤具有 max columnC 的行,然后进行分组和聚合。

      df.createOrReplaceTempView('df')
      
      result = spark.sql("""
          SELECT columnA, columnB, columnC, count(columnD) columnD, sum(columnE) columnE 
          FROM (
              SELECT *, rank() over(partition by columnA, columnB order by columnC desc) r 
              FROM df
          )
          WHERE r = 1
          GROUP BY columnA, columnB, columnC
      """)
      
      result.show()
      +-------+-------+--------+-------+-------+
      |columnA|columnB| columnC|columnD|columnE|
      +-------+-------+--------+-------+-------+
      |PersonB|DataOne|20201225|      1|      3|
      |PersonA|DataOne|20210102|      2|      8|
      |PersonB|DataTwo|20201226|      1|      2|
      |PersonA|DataTwo|20201227|      1|      2|
      +-------+-------+--------+-------+-------+
      

      【讨论】:

        猜你喜欢
        • 2015-07-22
        • 1970-01-01
        • 2021-02-03
        • 2019-02-11
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多