【问题标题】:Pyspark dataframe join with a given list of primary keysPyspark 数据框与给定的主键列表连接
【发布时间】:2022-02-26 18:33:54
【问题描述】:

我怀疑这是否可能。

让我们谈谈我的要求,我有多个主键的tableA

primary_key: ['user_id', 'role_id']这样的多个表有超过2个Pks,所有Pks定义在一个json中如下图。

{  "sourcetable": "app_setting",
   "schema": "dbo",
    "primarykey": [
                "application_code",
                "region_code",
                "country_code",
                "app_setting_key",
                "app_setting_value"
    ]
}

在同一张桌子上,我定义了 2 个数据框,

Df1 = spark.read.parquet(tableA)    # complete table
df2 = Df1.filter((df1.user_id == 1) & (df1.user_id==1)) # df2 is filter Df

现在我想加入这些 df1 和 df2

join_Df= Df1.join(df2 , df2[primary_key] == Df1["primary_key"], "inner")

但我得到了错误:

in join assert isinstance(on[0], Column), "on should be Column or list of Column" AssertionError: on should be Column or list of Column

这种加入是否可以通过 PK 列表进行?

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql pyspark-dataframes


    【解决方案1】:

    我认为您的意思是复合主键而不是多个 pks。

    您可以简单地将主键作为连接条件中的字符串列表传递:

     table_info = {
            "sourcetable": "app_setting", "schema": "dbo",
            "primarykey": ["application_code", "region_code", "country_code", "app_setting_key", "app_setting_value"]
        }
    
    df_result = df1.alias("DF1").join(df2.alias("DF2"), table_info["primarykey"]) 
    

    或者,如果您更喜欢使用列,您可以遍历组成 pk 的列名列表并创建如下连接条件:

    from functools import reduce
    from pyspark.sql import functions as F
    
    
    join_cols = [
        F.col(f"DF1.{c}") == F.col(f"DF2.{c}")
        for c in table_info["primarykey"]
    ]
    
    df_result = df1.alias("DF1").join(
        df2.alias("DF2"),
        reduce(lambda acc, x: acc & x, join_cols),
        "inner"
    )
    

    【讨论】:

    • primary_key = table_parameter['primarykey']df_result = df1.join(df2, primary_key, "leftanti")
    • 以上代码可以正常工作,但是如果任何表中的 pk 列表不是字符串,就会出现问题。这意味着在将此数据帧与另一个面临数据不兼容问题的数据帧合并时。
    【解决方案2】:

    我现在已经尝试过了,效果不错,但不知道这是否是最佳答案。

    from pyspark.sql import SparkSession
    from pyspark.sql import Row
    import pyspark.sql.functions as F
    
    spark = SparkSession.builder.appName('abc').getOrCreate()
    
    lst1 = [[1, 2, 3], ['A', 'B', 'C'], ['aa', 'bb', 'cc']]
    lst2 = [[2, 3, 4], ['A', 'B', 'C'], ['aa', 'bb', 'cc']]
    lst3 = [[1, 2, 4], ['A', 'B', 'C'], ['aa', 'bb', 'cc']]
    
    R1 = Row("A1", "A2", "A3")
    R2 = Row("B1", "B2", "B3")
    R3 = Row("C1", "C2", "C3")
    df1 = spark.sparkContext.parallelize([R1(*r) for r in zip(*lst1)]).toDF().alias('df1')
    df2 = spark.sparkContext.parallelize([R2(*r) for r in zip(*lst2)]).toDF().alias('df2')
    df3 = spark.sparkContext.parallelize([R3(*r) for r in zip(*lst3)]).toDF().alias('df3')
    
    list_tup = [(df1, df2, "df1.A1", "df2.B1"),
                (df2, df3, "df2.B1", "df3.C1"),
                (df1, df3, "df1.A1", "df3.C1")]
    
    df_1 = list_tup[0][0]
    for x in list_tup:
        df_1 = x[0].join(x[1], on=F.col(x[2]) == F.col(x[3]), how="left_outer")
    
    df_1.show()
    
    
    
    +---+---+---+----+----+----+
    | A1| A2| A3|  C1|  C2|  C3|
    +---+---+---+----+----+----+
    |  1|  A| aa|   1|   A|  aa|
    |  2|  B| bb|   2|   B|  bb|
    |  3|  C| cc|null|null|null|
    +---+---+---+----+----+----+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-01-31
      • 1970-01-01
      • 2022-12-12
      • 1970-01-01
      • 2019-07-06
      • 1970-01-01
      • 1970-01-01
      • 2018-03-08
      相关资源
      最近更新 更多