【问题标题】:pyspark: Merge two or more rows if column have equal values in different rowspyspark:如果列在不同行中具有相同的值,则合并两行或多行
【发布时间】:2017-12-28 07:55:15
【问题描述】:

由于数据量大,我不得不使用pyspark将不同行中的dataframe值(一个列表)组合起来。

这样的数据框:

x = sqlContext.createDataFrame([("A", ['1','2','3']),("B", ['4','2','5','6']),("C", ['2','4','9','10']),("D", ['11','12','15','16'])],["index", "num_group"])
+-----+----------------+
|index|       num_group|
+-----+----------------+
|    A|       [1, 2, 3]|
|    B|    [4, 2, 5, 6]|
|    C|   [2, 4, 9, 10]|
|    D|[11, 12, 15, 16]|
+-----+----------------+

我想通过具有相同元素的列表合并num_group,如下所示: (索引是一个无意义的值或字符串)

+-------------------------+
|                num_group|
+-------------------------+
|[1, 2, 3, 4, 5, 6, 9, 10]|
|         [11, 12, 15, 16]|
+-------------------------+

我想我可以使用 graphframes GraphX 来查找连接并根据不同行中的相等值合并两行或多行。

有可能吗?我不太了解documents 的例子。

任何帮助将不胜感激。

【问题讨论】:

    标签: python python-3.x python-2.7 pyspark spark-graphx


    【解决方案1】:

    您不需要使用GraphX 库。 您所需要的只是collect_listudfexplode 中的pyspark.sql.functions 函数和一些小的python 操作。

    因此,您要做的第一步是收集num_group 列中的所有lists

    from pyspark.sql import functions as F
    y = x.select(F.collect_list("num_group").alias("collected"))
    

    应该给你dataframe

    +----------------------------------------------------------------------------------------------------------+
    |collected                                                                                                 |
    +----------------------------------------------------------------------------------------------------------+
    |[WrappedArray(1, 2, 3), WrappedArray(4, 2, 5, 6), WrappedArray(2, 4, 9, 10), WrappedArray(11, 12, 15, 16)]|
    +----------------------------------------------------------------------------------------------------------+
    

    下一步将定义一个udf 函数来遍历所有收集的列表并检查每个列表中的元素,并根据您的要求创建一个包含合并列表的新列表数组。

    def computation(s):
        finalList = []
        finalList.append(list(str(i) for i in s[0]))
        for index in range(1, len(s)):
            for finals in finalList:
                check = False
                for x in s[index]:
                    if x in finals:
                        check = True
                        break
                if check == True:
                    finals_1 = finals + list(str(i) for i in s[index])
                    finalList.remove(finals)
                    finalList.append(sorted(list(set(str(i) for i in finals_1))))
                else:
                    finalList.append(list(str(i) for i in s[index]))
        return finalList
    
    from pyspark.sql.functions import udf
    from pyspark.sql.types import ArrayType, StringType
    collecting_udf = udf(computation, ArrayType(StringType()))
    

    然后您可以只使用explode 函数将最终列表分成单独的行。

    from pyspark.sql.functions import explode
    y.select(explode(collecting_udf("collected")).alias("num_group"))
    

    你应该有以下输出

    +-------------------------+
    |num_group                |
    +-------------------------+
    |[1, 10, 2, 3, 4, 5, 6, 9]|
    |[11, 12, 15, 16]         |
    +-------------------------+
    

    【讨论】:

    • 感谢您的回复。我已经尝试过您几天前自己提供的方式,但是由于数据量大(100,000,000行)而卡住了。这就是为什么我必须找到其他方法来解决这个问题!
    • 你有其他解决这个问题的方法吗?
    猜你喜欢
    • 2018-08-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-08-11
    • 2021-07-16
    • 2020-05-13
    • 2022-01-11
    相关资源
    最近更新 更多