【问题标题】:pyspark collect_set of column outside of groupbygroupby 之外的列的 pyspark collect_set
【发布时间】:2019-11-07 20:12:16
【问题描述】:

我正在尝试使用 collect_set 来获取 NOT 属于 groupby 的 categorie_names 字符串列表。 我的代码是

from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F

sc = SparkContext("local")
sqlContext = HiveContext(sc)
df = sqlContext.createDataFrame([
     ("1", "cat1", "Dept1", "product1", 7),
     ("2", "cat2", "Dept1", "product1", 100),
     ("3", "cat2", "Dept1", "product2", 3),
     ("4", "cat1", "Dept2", "product3", 5),
    ], ["id", "category_name", "department_id", "product_id", "value"])

df.show()
df.groupby("department_id", "product_id")\
    .agg({'value': 'sum'}) \
    .show()

#            .agg( F.collect_set("category_name"))\

输出是

+---+-------------+-------------+----------+-----+
| id|category_name|department_id|product_id|value|
+---+-------------+-------------+----------+-----+
|  1|         cat1|        Dept1|  product1|    7|
|  2|         cat2|        Dept1|  product1|  100|
|  3|         cat2|        Dept1|  product2|    3|
|  4|         cat1|        Dept2|  product3|    5|
+---+-------------+-------------+----------+-----+

+-------------+----------+----------+
|department_id|product_id|sum(value)|
+-------------+----------+----------+
|        Dept1|  product2|         3|
|        Dept1|  product1|       107|
|        Dept2|  product3|         5|
+-------------+----------+----------+

我想要这个输出

+-------------+----------+----------+----------------------------+
|department_id|product_id|sum(value)| collect_list(category_name)|
+-------------+----------+----------+----------------------------+
|        Dept1|  product2|         3|  cat2                      |
|        Dept1|  product1|       107|  cat1, cat2                |
|        Dept2|  product3|         5|  cat1                      |
+-------------+----------+----------+----------------------------+

尝试 1

df.groupby("department_id", "product_id")\
    .agg({'value': 'sum'}) \
    .agg(F.collect_set("category_name")) \
    .show()

我收到了这个错误:

pyspark.sql.utils.AnalysisException:“无法解析'category_name' 给定输入列:[department_id, product_id, sum(value)];;\n'聚合 [collect_set('category_name, 0, 0) AS collect_set(category_name)#35]\n+- 聚合 [department_id#2, product_id#3], [department_id#2, product_id#3, sum(value#4L) AS sum(value)#24L]\n +- LogicalRDD [id#0, category_name#1, department_id#2, product_id#3, value#4L]\n"

尝试 2 我将 category_name 作为 groupby 的一部分

df.groupby("category_name", "department_id", "product_id")\
    .agg({'value': 'sum'}) \
    .agg(F.collect_set("category_name")) \
    .show()

有效,但输出不正确

+--------------------------+
|collect_set(category_name)|
+--------------------------+
|              [cat1, cat2]|
+--------------------------+

【问题讨论】:

    标签: group-by pyspark set collect


    【解决方案1】:

    您可以specify multiple aggregations within one agg()。您的案例的正确语法是:

    df.groupby("department_id", "product_id")\
        .agg(F.sum('value'), F.collect_set("category_name"))\
        .show()
    #+-------------+----------+----------+--------------------------+
    #|department_id|product_id|sum(value)|collect_set(category_name)|
    #+-------------+----------+----------+--------------------------+
    #|        Dept1|  product2|         3|                    [cat2]|
    #|        Dept1|  product1|       107|              [cat1, cat2]|
    #|        Dept2|  product3|         5|                    [cat1]|
    #+-------------+----------+----------+--------------------------+
    

    您的方法不起作用,因为第一个 .agg()pyspark.sql.group.GroupedData 上工作并返回一个新的 DataFrame。随后对agg 的调用实际上是pyspark.sql.DataFrame.agg,即

    df.groupBy.agg()的简写

    所以基本上第二次调用agg 是再次分组,这不是你想要的。

    【讨论】:

      猜你喜欢
      • 2016-10-01
      • 1970-01-01
      • 2016-02-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-07-05
      • 1970-01-01
      • 2018-07-02
      相关资源
      最近更新 更多