【问题标题】:Anti group by/R apply in Pyspark反 group by/R 适用于 Pyspark
【发布时间】:2020-04-17 18:17:23
【问题描述】:

我是一名进入 pyspark 世界的 R 程序员,已经掌握了很多基本技巧,但我仍在努力解决的问题是我会做的事情 apply 或基本 for 循环。

在这种情况下,我试图计算一个 ID 的“anti-groupby”。基本上,这个想法是查看该 ID 的总体,然后查看非该 ID 的总体,并将这两个值放在同一行上。使用 groupby 获取该 ID 的人口很容易,然后将其加入以 new_id 作为唯一列的数据集。

这就是我在 R 中的做法:

anti_group <- function(id){
    tr <- sum(subset(df1, new_id!=id)$total_1)
    to <- sum(subset(df1, new_id!=id)$total_2)
    54 * tr / to
  }
  test$other.RP54 <- sapply(test$new_id, anti_group  )

我将如何在 pyspark 中做到这一点?

谢谢!

编辑:

#df.show()
#sample data
+---+-----+
| id|value|
+---+-----+
|  1|   40|
|  1|   30|
|  2|   10|
|  2|   90|
|  3|   20|
|  3|   10|
|  4|    2|
|  4|    5|
+---+-----+

然后是一些创建最终数据帧的函数,如下所示:

+---+-------------+------------------+
| id|grouped_total|anti_grouped_total|
+---+-------------+------------------+
|  1|           70|               137|
|  2|          100|               107|
|  3|           30|               177|
|  4|            7|               200|
+---+-------------+------------------+

【问题讨论】:

    标签: r pyspark group-by sapply pyspark-dataframes


    【解决方案1】:

    我认为您可以通过两步来做到这一点:首先按 id 求和,然后取总计并减去该 id 的值。

    我的想法有点像dplyr中的group_by(id) %&gt;% summarise(x = sum(x)) %&gt;% mutate(y = sum(x) - x)

    我提出的解决方案是基于Window 函数。未经测试:

    让我们先创建数据

    import pyspark.sql.functions as psf
    import pyspark.sql.window as psw
    
    df = spark.createDataFrame([(1,40),(1,30),(2,10),(2,90),(3,20),(3,10),(4,2),(4,5)], ['id','value'])
    
    df.show(2)
    
    +---+-----+
    | id|value|
    +---+-----+
    |  1|   40|
    |  1|   30|
    +---+-----+
    only showing top 2 rows
    

    然后应用该方法:

    w = psw.Window.orderBy()
    df_id = df.groupBy("id").agg(psf.sum("value").alias("grouped_total"))
    df_id = (df_id
              .withColumn("anti_grouped_total",psf.sum("grouped_total").over(w))
              .withColumn('anti_grouped_total', psf.col('anti_grouped_total') - psf.col('grouped_total'))
            )
    
    df_id.show(2)
    +---+-------------+------------------+
    | id|grouped_total|anti_grouped_total|
    +---+-------------+------------------+
    |  3|           30|               177|
    |  1|           70|               137|
    +---+-------------+------------------+
    only showing top 2 rows
    
    

    【讨论】:

    • 有趣,这绝对是一种方法。只需做一个分组,然后将全部人口总和附加到数据框并做很容易的差异。我在编辑中添加了一些示例数据和示例输出,以便更加清晰。
    • 感谢您的帮助!你们基本上有同样的答案,他基本上只是第一个。感谢您的 R 相似性。
    • @BaseballR 你可以给他接受的答案,idc。我只是在这里帮助和学习。干杯!
    【解决方案2】:

    因此没有可以复制该 groupBy 函数的内置函数,但您可以轻松地通过使用 case(when/otherwise clause) 创建一个新列来创建您的 组和反组,然后是 groupBy 在那个 new column 上。

    #df.show()
    #sample data
    +---+-----+
    | id|value|
    +---+-----+
    |  1|   40|
    |  1|   30|
    |  2|   10|
    |  2|   90|
    |  3|   20|
    |  3|   10|
    |  4|    2|
    |  4|    5|
    +---+-----+
    
    from pyspark.sql import functions as F
    df.withColumn("anti_id_1", F.when(F.col("id")==1, F.lit('1')).otherwise(F.lit('Not_1')))\
      .groupBy("anti_id_1").agg(F.sum("value").alias("sum")).show()
    
    +---------+---+
    |anti_id_1|sum|
    +---------+---+
    |        1| 70|
    |    Not_1|137|
    +---------+---+
    

    UPDATE:

    from pyspark.sql.window import Window
    from pyspark.sql import functions as F
    
    w1=Window().partitionBy("id")
    w=Window().partitionBy()
    df.withColumn("grouped_total",F.sum("value").over(w1))\
      .withColumn("anti_grouped_total", (F.sum("value").over(w))-F.col("grouped_total"))\
      .groupBy("id").agg(F.first("grouped_total").alias("grouped_total"),\
                         F.first("anti_grouped_total").alias("anti_grouped_total"))\
      .drop("value").orderBy("id").show()
    
    
    +---+-------------+------------------+
    | id|grouped_total|anti_grouped_total|
    +---+-------------+------------------+
    |  1|           70|               137|
    |  2|          100|               107|
    |  3|           30|               177|
    |  4|            7|               200|
    +---+-------------+------------------+
    

    Less verbose/concise 实现相同输出的方式:

    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    w = Window().partitionBy()
    df.groupBy("id").agg(F.sum("value").alias("grouped_total"))\
              .withColumn("anti_grouped_total",F.sum("grouped_total").over(w)-F.col("grouped_total")).orderBy("id"),show()
    

    For 2 value columns:

    df.show()
    +---+------+------+
    | id|value1|value2|
    +---+------+------+
    |  1|    40|    50|
    |  1|    30|    70|
    |  2|    10|    91|
    |  2|    90|    21|
    |  3|    20|    42|
    |  3|    10|     4|
    |  4|     2|    23|
    |  4|     5|    12|
    +---+------+------+
    
    from pyspark.sql.window import Window
    from pyspark.sql import functions as F
    
    w = Window().partitionBy()
    df.groupBy("id").agg(F.sum("value1").alias("grouped_total_1"),F.sum("value2").alias("grouped_total_2"))\
              .withColumn("anti_grouped_total_1",F.sum("grouped_total_1").over(w)-F.col("grouped_total_1"))\
              .withColumn("anti_grouped_total_2",F.sum("grouped_total_2").over(w)-F.col("grouped_total_2")).orderBy("id").show()
    
    +---+---------------+---------------+--------------------+--------------------+
    | id|grouped_total_1|grouped_total_2|anti_grouped_total_1|anti_grouped_total_2|
    +---+---------------+---------------+--------------------+--------------------+
    |  1|             70|            120|                 137|                 193|
    |  2|            100|            112|                 107|                 201|
    |  3|             30|             46|                 177|                 267|
    |  4|              7|             35|                 200|                 278|
    +---+---------------+---------------+--------------------+--------------------+
    

    【讨论】:

    • 我添加了更多信息以便使用您的示例数据更清晰(抱歉!),感谢您的帮助。如果我们有 4 个 ID 并且我可以手动迭代,那么你的工作有效,但如果我有 1000 多个值,它就不会很遗憾。
    • 快速提问,如果我想创建两个分组列和两个反分组列怎么办?就像有一个值 1 和一个值 2,想要分组值 1、反分组值 1、分组值 2 和反分组值 2?
    • 您将在 groupby 的 agg 中添加另一个总和,并添加另一个 withcolumn 以使用窗口计算差异。检查我的 2 个值列的更新。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-07
    • 2011-03-12
    • 1970-01-01
    • 2017-10-20
    • 1970-01-01
    • 2021-04-03
    相关资源
    最近更新 更多