【问题标题】:Append List to a Spark Data frame column based on matching Keys from a second data frame根据来自第二个数据帧的匹配键将列表附加到 Spark 数据帧列
【发布时间】:2020-09-16 02:23:27
【问题描述】:

我有 2 个具有相同列名的 spark 数据框,并且希望在键列相互匹配时使用 df2 中同一列中的列表来扩展第一个 df 中的某些列。

df1:
+----+---+--++------+---------+-----+--------+--------+-------+
|k1  |  k2  |list1  | list2   |list3|list4   |list5   |list 6 |
+----+---+--+-------+---------------------------------+-------+
|   a| 121  |[car1] |[price1] |[1]  |[False] |[0.000] |[vfdvf]|
|   b| 11   |[car3] |[price3] |[2]  |[False] |[1.000] |[00000]|
|   c| 23   |[car3] |[price3] |[4]  |[False] |[2.500] |[fdabh]|
|   d| 250  |[car6] |[price6] |[6]  |[True]  |[0.450] |[00000]|
+----+---+--++----+---+--+--++----+---+------+----------------+


df2:
+----+---+--++------+---------+-----+--------+--------+-------+
|k1  |  k2  |list1  | list2   |list3|list4   |list5   |list 6 |
+----+---+--+-------+---------------------------------+-------+
|   m| 121  |[car5] |[price5] |[5]  |[False] |[3.000] |[vfdvf]|
|   b| 11   |[car8] |[price8] |[8]  |[False] |[2.000] |[mnfaf]|
|   c| 23   |[car7] |[price7] |[7]  |[False] |[1.500] |[00000]|
|   n| 250  |[car9] |[price9] |[9]  |[False] |[0.450] |[00000]|
+----+---+--++----+---+--+--++----+---+------+----------------+

由于包含项目列表的列彼此相关,因此订单必须保持不变。有没有办法只有当 key1 和 key2 在两个 dfs 之间匹配时才能将整个列表从 df2 附加到 df1?

结果应如下所示(我无法放入列表 6 列,但希望在结果中看到与其他列表列相同的模式):

   +--+--+-----------+---------------+-----+------------+--------------+
   |k1|k2|list1      | list2         |list3|list4       |list5        |
   +--+--+-----------+---------------+-----+------------+--------------+
   |b |11|[car3,car8]|[price3,price8]|[2,8]|[False,False]|[1.000,2.000]| 
   |c |23|[car3,car7]|[price3,price7]|[4,7]|[False,False]|[2.500,1.500]| 
   +--+--+-----------+---------------+-----+-------------+-------------+

我仍然是使用 UDF 的新手,在 stackoverflow 上找不到类似的问题,我发现的唯一类似的问题是使用 pandas(How to merge two list columns when merging DataFrames?),这对我的用例来说超级慢。对此的任何见解将不胜感激。

【问题讨论】:

    标签: pyspark apache-spark-sql user-defined-functions pyspark-dataframes


    【解决方案1】:

    首先,您需要像我在下面所做的那样创建您自己的架构,然后您的代码才能工作,请使用我更新的代码

    试试这个:你不需要 UDF,首先执行一个内部连接,然后将它连接起来

       from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    
    from pyspark.sql.types import *
    from datetime import datetime
    from pyspark.sql import *
    from collections import *
    from pyspark.sql.functions import udf,explode
    from pyspark.sql.types import StringType
    table_schema = StructType([StructField('key1', StringType(), True),
                         StructField('key2', IntegerType(), True),
                         StructField('list1', ArrayType(StringType()), False),
                         StructField('list2', ArrayType(StringType()), False),
                         StructField('list3', ArrayType(IntegerType()), False),
                         StructField('list4', StringType(), False),
                         StructField('list5', ArrayType(FloatType()), False),
                         StructField('list6', ArrayType(StringType()), False)
                         ])
    df= spark.createDataFrame(
        [
     (  "a", 121  ,["car1"] ,["price1"] ,[1]  ,["False"] ,[0.000] ,["vfdvf"]),
    (   "b", 11   ,["car3"] ,["price3"] ,[2]  ,["False"] ,[1.000] ,[00000]),
    (   "c", 23   ,["car3"] ,["price3"] ,[4]  ,["False"] ,[2.500] ,["fdabh"]),
    (   "d", 250  ,["car6"] ,["price6"] ,[6]  ,["True"]  ,[0.450] ,[00000])
           
            ],table_schema
        )
    
    df2= spark.createDataFrame(
        [
     ("m", 121  ,["car5"] ,["price5"] ,[5]  ,["False"] ,[3.000] ,["vfdvf"]),
    (   "b", 11   ,["car8"] ,["price8"] ,[8]  ,["False"] ,[2.000] ,["mnfaf"]),
    (   "c", 23   ,["car7"] ,["price7"] ,[7]  ,["False"] ,[1.500] ,[00000]),
    (  "n", 250  ,["car9"] ,["price9"] ,[9]  ,["False"] ,[0.450] ,[00000])
    
    ],table_schema
        )
    df.createOrReplaceTempView("A")
    df2.createOrReplaceTempView("B")
    spark.sql("select a.key1,a.key2,concat(a.list1,b.list1)List1 ,concat(a.list2,b.list2)List2, \
    concat(a.list3,b.list3)List3 ,concat(a.list4,b.list4)List4,\
              concat(a.list5,b.list5)List5 ,\
              concat(a.list6,b.list6)List6 \
    from A a inner join B  b on a.key1=b.key1 order by a.key1").show(truncate=False)
    
     +----+----+------------+----------------+------+--------------+----------+----------+
    |key1|key2|List1       |List2           |List3 |List4         |List5     |List6     |
    +----+----+------------+----------------+------+--------------+----------+----------+
    |b   |11  |[car3, car8]|[price3, price8]|[2, 8]|[False][False]|[1.0, 2.0]|[0, mnfaf]|
    |c   |23  |[car3, car7]|[price3, price7]|[4, 7]|[False][False]|[2.5, 1.5]|[fdabh, 0]|
    +----+----+------------+----------------+------+--------------+----------+----------+
    

    【讨论】:

    • 感谢分享这个解决方案,我忘了提到我的真实数据要复杂得多,每个列表列都可以有不同的数据类型(字符串、整数、数组、longtype、bytetype、doubletype 和 booleantype )。我尝试了这个解决方案并得到不匹配的类型错误(argumenet 需要字符串类型,但 column2 是 array 类型。有什么办法可以修复它?
    • 请发布您拥有的不同类型的列表和数组,我会做出相应的更改
    • 谢谢阿迪,刚刚用我希望在我的真实数据中看到的每一列中的数据类型编辑了这个问题。
    • 请看我更新的代码,我已经创建了自己的架构,那么你就不会得到那个错误
    • 解决方案仍然无法正常工作,只是注意到 spark sql 中的 concat 仅对字符串类型有效,我需要使用 UDF 进行适当的更改,一旦我将解决方案发布给其他人参考和我的未来参考也是。无论如何,感谢您分享您的想法。
    【解决方案2】:

    我找到了我的问题的答案,并希望将其发布在此处,以与可能面临相同问题的其他人分享并供我将来参考。

        from pyspark.sql.types import BooleanType
        from pyspark.sql.types import StringType
        from pyspark.sql.types import DoubleType
        from pyspark.sql.types import IntegerType
        from pyspark.sql.types import ArrayType
        from pyspark.sql.types import LongType
        from pyspark.sql.types import ByteType
        
        def concatTypesFunc(array1, array2): 
            final_array=array1+array2
            return final_array
    
        spark.udf.register("concat_types", concatTypesFunc, 
        ArrayType(BooleanType())
        spark.udf.register("concat_types", concatTypesFunc, 
        ArrayType(StringType())
        spark.udf.register("concat_types", concatTypesFunc, 
        ArrayType(DoubleType())
        spark.udf.register("concat_types", concatTypesFunc, 
        ArrayType(IntegerType())
        spark.udf.register("concat_types", concatTypesFunc, 
        ArrayType(LongType())
        spark.udf.register("concat_types", concatTypesFunc, 
        ByteType(LongType()) 
    
        df= spark.createDataFrame(
            [
         (  "a", 121  ,["car1"] ,["price1"] ,[1]  ,["False"] ,[0.000] ,["vfdvf"]),
        (   "b", 11   ,["car3"] ,["price3"] ,[2]  ,["False"] ,[1.000] ,[00000]),
        (   "c", 23   ,["car3"] ,["price3"] ,[4]  ,["False"] ,[2.500] ,["fdabh"]),
        (   "d", 250  ,["car6"] ,["price6"] ,[6]  ,["True"]  ,[0.450] ,[00000])
               
                ],table_schema
            )
        
        df2= spark.createDataFrame(
            [
         ("m", 121  ,["car5"] ,["price5"] ,[5]  ,["False"] ,[3.000] ,["vfdvf"]),
        (   "b", 11   ,["car8"] ,["price8"] ,[8]  ,["False"] ,[2.000] ,["mnfaf"]),
        (   "c", 23   ,["car7"] ,["price7"] ,[7]  ,["False"] ,[1.500] ,[00000]),
        (  "n", 250  ,["car9"] ,["price9"] ,[9]  ,["False"] ,[0.450] ,[00000])
        
        ],table_schema
            )
        df.createOrReplaceTempView("a")
        df2.createOrReplaceTempView("b")
        spark.sql("select a.key1, a.key2, concat_types(a.list1,b.list1)List1 ,concat_types(a.list2,b.list2)List2, \
        concat_types(a.list3,b.list3)List3 ,concat_types(a.list4,b.list4)List4,\
                  concat_types(a.list5,b.list5)List5 ,\
                  concat_types(a.list6,b.list6)List6 \
        from a inner join b on a.key1=b.key1 order by a.key1").show(truncate=False)
    
    
     +----+----+------------+----------------+------+--------------+----------+----------+
    |key1|key2|List1       |List2           |List3 |List4         |List5     |List6     |
    +----+----+------------+----------------+------+--------------+----------+----------+
    |b   |11  |[car3, car8]|[price3, price8]|[2, 8]|[False][False]|[1.0, 2.0]|[0, mnfaf]|
    |c   |23  |[car3, car7]|[price3, price7]|[4, 7]|[False][False]|[2.5, 1.5]|[fdabh, 0]|
    +----+----+------------+----------------+------+--------------+----------+----------+
    
    

    【讨论】:

      猜你喜欢
      • 2018-04-28
      • 2021-03-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-23
      • 1970-01-01
      相关资源
      最近更新 更多