【问题标题】:GroupBy and concat array columns pysparkGroupBy 和 concat 数组列 pyspark
【发布时间】:2018-07-02 13:02:38
【问题描述】:

我有这个数据框

df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(["store", "values"])

+-----+---------+
|store|   values|
+-----+---------+
|    1|[1, 2, 3]|
|    1|[4, 5, 6]|
|    2|      [2]|
|    2|      [3]|
+-----+---------+

我想转换成下面的df:

+-----+------------------+
|store|      values      |
+-----+------------------+
|    1|[1, 2, 3, 4, 5, 6]|
|    2|            [2, 3]|
+-----+------------------+

我这样做了:

from  pyspark.sql import functions as F
df.groupBy("store").agg(F.collect_list("values"))

但解决方案有这个WrappedArrays

+-----+----------------------------------------------+
|store|collect_list(values)                          |
+-----+----------------------------------------------+
|1    |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)]|
|2    |[WrappedArray(2), WrappedArray(3)]            |
+-----+----------------------------------------------+

有没有办法将WrappedArrays 转换为串联数组?或者我可以换一种方式吗?

谢谢!

【问题讨论】:

  • 尝试编辑 answers 以提出新问题不仅不寻常,而且确实闻所未闻!请改为打开一个新问题,如有必要,请在此处链接...
  • 对不起!我是新手
  • 不用担心 - 没有伤害。如果您打开一个新问题,请在此处添加一个链接
  • @desertnaut 我发布了一个新问题stackoverflow.com/questions/48426895/…
  • Spark 现在支持flatten 功能。见 Hululu 的回答stackoverflow.com/a/65354392/5238639

标签: pyspark apache-spark-sql


【解决方案1】:

对于这样一个简单的问题,您还可以使用explode 函数。不过,我不知道与所选 udf 答案相比的性能特征。

from pyspark.sql import functions as F

df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(['store', 'values'])

df2 = df.withColumn('values', F.explode('values'))
# +-----+------+
# |store|values|
# +-----+------+
# |    1|     1|
# |    1|     2|
# |    1|     3|
# |    1|     4|
# |    1|     5|
# |    1|     6|
# |    2|     2|
# |    2|     3|
# +-----+------+

df3 = df2.groupBy('store').agg(F.collect_list('values').alias('values'))
# +-----+------------------+
# |store|           values |
# +-----+------------------+
# |1    |[4, 5, 6, 1, 2, 3]|
# |2    |[2, 3]            |
# +-----+------------------+

注意:您可以在聚合中使用 F.collect_set() 或在 df2 上使用 .drop_duplicates() 来删除重复值。

如果您想在收集的列表中维护有序值,我在另一个 SO 答案中找到了以下方法:

from pyspark.sql.window import Window

w = Window.partitionBy('store').orderBy('values')
df3 = df2.withColumn('ordered_value_lists', F.collect_list('values').over(w))
# +-----+------+-------------------+
# |store|values|ordered_value_lists|
# +-----+------+-------------------+
# |1    |1     |[1]                |
# |1    |2     |[1, 2]             |
# |1    |3     |[1, 2, 3]          |
# |1    |4     |[1, 2, 3, 4]       |
# |1    |5     |[1, 2, 3, 4, 5]    |
# |1    |6     |[1, 2, 3, 4, 5, 6] |
# |2    |2     |[2]                |
# |2    |3     |[2, 3]             |
# +-----+------+-------------------+

df4 = df3.groupBy('store').agg(F.max('ordered_value_lists').alias('values'))
df4.show(truncate=False)
# +-----+------------------+
# |store|values            |
# +-----+------------------+
# |1    |[1, 2, 3, 4, 5, 6]|
# |2    |[2, 3]            |
# +-----+------------------+

如果值本身不能确定顺序,您可以使用F.posexplode() 并在窗口函数中使用'pos' 列而不是'values' 来确定顺序。注意:您还需要一个更高级别的 order 列来对原始数组进行排序,然后使用数组中的位置来对数组的元素进行排序。

df = sc.parallelize([(1, [1, 2, 3], 1), (1, [4, 5, 6], 2) , (2, [2], 1),(2, [3], 2)]).toDF(['store', 'values', 'array_order'])
# +-----+---------+-----------+
# |store|values   |array_order|
# +-----+---------+-----------+
# |1    |[1, 2, 3]|1          |
# |1    |[4, 5, 6]|2          |
# |2    |[2]      |1          |
# |2    |[3]      |2          |
# +-----+---------+-----------+

df2 = df.select('*', F.posexplode('values'))
# +-----+---------+-----------+---+---+
# |store|values   |array_order|pos|col|
# +-----+---------+-----------+---+---+
# |1    |[1, 2, 3]|1          |0  |1  |
# |1    |[1, 2, 3]|1          |1  |2  |
# |1    |[1, 2, 3]|1          |2  |3  |
# |1    |[4, 5, 6]|2          |0  |4  |
# |1    |[4, 5, 6]|2          |1  |5  |
# |1    |[4, 5, 6]|2          |2  |6  |
# |2    |[2]      |1          |0  |2  |
# |2    |[3]      |2          |0  |3  |
# +-----+---------+-----------+---+---+

w = Window.partitionBy('store').orderBy('array_order', 'pos')
df3 = df2.withColumn('ordered_value_lists', F.collect_list('col').over(w))
# +-----+---------+-----------+---+---+-------------------+
# |store|values   |array_order|pos|col|ordered_value_lists|
# +-----+---------+-----------+---+---+-------------------+
# |1    |[1, 2, 3]|1          |0  |1  |[1]                |
# |1    |[1, 2, 3]|1          |1  |2  |[1, 2]             |
# |1    |[1, 2, 3]|1          |2  |3  |[1, 2, 3]          |
# |1    |[4, 5, 6]|2          |0  |4  |[1, 2, 3, 4]       |
# |1    |[4, 5, 6]|2          |1  |5  |[1, 2, 3, 4, 5]    |
# |1    |[4, 5, 6]|2          |2  |6  |[1, 2, 3, 4, 5, 6] |
# |2    |[2]      |1          |0  |2  |[2]                |
# |2    |[3]      |2          |0  |3  |[2, 3]             |
# +-----+---------+-----------+---+---+-------------------+

df4 = df3.groupBy('store').agg(F.max('ordered_value_lists').alias('values'))
# +-----+------------------+
# |store|values            |
# +-----+------------------+
# |1    |[1, 2, 3, 4, 5, 6]|
# |2    |[2, 3]            |
# +-----+------------------+

编辑:如果您想保留一些列以备不时之需,并且不需要汇总,您可以将它们包含在 groupBy 中或在汇总后重新加入它们(示例如下)。如果它们确实需要聚合,只需按 'store' 分组,然后在 'other' 列上添加您需要的任何聚合函数到 .agg() 调用。

from pyspark.sql import functions as F

df = sc.parallelize([(1, [1, 2, 3], 'a'), (1, [4, 5, 6], 'a') , (2, [2], 'b'), (2, [3], 'b')]).toDF(['store', 'values', 'other'])
# +-----+---------+-----+
# |store|   values|other|
# +-----+---------+-----+
# |    1|[1, 2, 3]|    a|
# |    1|[4, 5, 6]|    a|
# |    2|      [2]|    b|
# |    2|      [3]|    b|
# +-----+---------+-----+

df2 = df.withColumn('values', F.explode('values'))
# +-----+------+-----+
# |store|values|other|
# +-----+------+-----+
# |    1|     1|    a|
# |    1|     2|    a|
# |    1|     3|    a|
# |    1|     4|    a|
# |    1|     5|    a|
# |    1|     6|    a|
# |    2|     2|    b|
# |    2|     3|    b|
# +-----+------+-----+

df3 = df2.groupBy('store', 'other').agg(F.collect_list('values').alias('values'))
# +-----+-----+------------------+
# |store|other|            values|
# +-----+-----+------------------+
# |    1|    a|[1, 2, 3, 4, 5, 6]|
# |    2|    b|            [2, 3]|
# +-----+-----+------------------+

df4 = (
    df.drop('values')
    .join(
        df2.groupBy('store')
        .agg(F.collect_list('values').alias('values')),
        on=['store'], how='inner'
    )
    .drop_duplicates()
)
# +-----+-----+------------------+
# |store|other|            values|
# +-----+-----+------------------+
# |    1|    a|[1, 2, 3, 4, 5, 6]|
# |    2|    b|            [2, 3]|
# +-----+-----+------------------+

【讨论】:

  • 如何使用这种方法保留其他列?
  • 您可以将它们与store 一起添加到groupBy 函数中,或者您可以将最终结果与store 列上的初始输入数据框连接起来。
  • 谢谢!我最终将它们添加到 groupby 中。
【解决方案2】:

现在,可以使用 flatten 功能,事情变得容易多了。 您只需在 groupby 之后展平收集的数组。

# 1. Create the DF

    df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(["store","values"])

+-----+---------+
|store|   values|
+-----+---------+
|    1|[1, 2, 3]|
|    1|[4, 5, 6]|
|    2|      [2]|
|    2|      [3]|
+-----+---------+

# 2. Group by store

    df = df.groupBy("store").agg(F.collect_list("values"))

+-----+--------------------+
|store|collect_list(values)|
+-----+--------------------+
|    1|[[1, 2, 3], [4, 5...|
|    2|          [[2], [3]]|
+-----+--------------------+

# 3. finally.... flat the array

    df = df.withColumn("flatten_array", F.flatten("collect_list(values)"))

+-----+--------------------+------------------+
|store|collect_list(values)|     flatten_array|
+-----+--------------------+------------------+
|    1|[[1, 2, 3], [4, 5...|[1, 2, 3, 4, 5, 6]|
|    2|          [[2], [3]]|            [2, 3]|
+-----+--------------------+------------------+

【讨论】:

  • 不错。只是补充。所有这些都可以通过df.groupBy("store").agg(F.flatten(F.collect_list("values"))) 一步完成
【解决方案3】:

从 PySpark 2.4 开始,您可以使用以下代码:

df = df.groupBy("store").agg(collect_list("values").alias("values"))

df = df.select("store", array_sort(array_distinct(expr("reduce(values, array(), (x,y) -> concat(x, y))"))).alias("values"))

【讨论】:

    【解决方案4】:

    你需要一个扁平化的 UDF;从你自己的df开始:

    spark.version
    # u'2.2.0'
    
    from pyspark.sql import functions as F
    import pyspark.sql.types as T
    
    def fudf(val):
        return reduce (lambda x, y:x+y, val)
    
    flattenUdf = F.udf(fudf, T.ArrayType(T.IntegerType()))
    
    df2 = df.groupBy("store").agg(F.collect_list("values"))
    df2.show(truncate=False)
    # +-----+----------------------------------------------+ 
    # |store|                         collect_list(values) | 
    # +-----+----------------------------------------------+ 
    # |1    |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)]| 
    # |2    |[WrappedArray(2), WrappedArray(3)]            | 
    # +-----+----------------------------------------------+
    
    df3 = df2.select("store", flattenUdf("collect_list(values)").alias("values"))
    df3.show(truncate=False)
    # +-----+------------------+
    # |store|           values |
    # +-----+------------------+
    # |1    |[1, 2, 3, 4, 5, 6]|
    # |2    |[2, 3]            |
    # +-----+------------------+
    

    更新(评论后):

    上面的 sn-p 只适用于 Python 2。对于 Python 3,您应该如下修改 UDF:

    import functools
    
    def fudf(val):
        return functools.reduce(lambda x, y:x+y, val)
    

    使用 Spark 2.4.4 测试。

    【讨论】:

    • pyspark/spark 2.4 上的这个示例失败,错误为NameError: name 'reduce' is not defined。你知道为什么吗?
    • @AlexOrtner 这是 Python 3 的问题,而不是 Spark 的问题;请看更新
    【解决方案5】:

    我可能会这样做。

    >>> df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(["store", "values"])
    >>> df.show()
    +-----+---------+
    |store|   values|
    +-----+---------+
    |    1|[1, 2, 3]|
    |    1|[4, 5, 6]|
    |    2|      [2]|
    |    2|      [3]|
    +-----+---------+
    
    >>> df.rdd.map(lambda r: (r.store, r.values)).reduceByKey(lambda x,y: x + y).toDF(['store','values']).show()
    +-----+------------------+
    |store|            values|
    +-----+------------------+
    |    1|[1, 2, 3, 4, 5, 6]|
    |    2|            [2, 3]|
    +-----+------------------+
    

    【讨论】:

    • 我们将如何删除重复项
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-04-26
    • 2019-10-26
    • 1970-01-01
    • 2019-08-13
    • 2022-08-09
    • 1970-01-01
    • 2021-04-01
    相关资源
    最近更新 更多