【问题标题】:dynamically join two spark-scala dataframes on multiple columns without hardcoding join conditions在多个列上动态连接两个 spark-scala 数据帧,无需硬编码连接条件
【发布时间】:2017-09-04 00:05:04
【问题描述】:

我想在多列上动态加入两个 spark-scala 数据帧。我会避免硬编码列名比较,如以下语句所示;

val joinRes = df1.join(df2, df1("col1") == df2("col1") and df1("col2") == df2("col2"))

这个查询的解决方案已经存在于pyspark版本中——在以下链接中提供 PySpark DataFrame - Join on multiple columns dynamically

我想使用 spark-scala 编写相同的代码

【问题讨论】:

    标签: scala join multiple-columns spark-dataframe


    【解决方案1】:

    在 scala 中,您可以像在 python 中一样使用类似的方式,但您需要使用 map 和 reduce 函数:

    val sparkSession = SparkSession.builder().getOrCreate()
    import sparkSession.implicits._
    
    val df1 = List("a,b", "b,c", "c,d").toDF("col1","col2")
    val df2 = List("1,2", "2,c", "3,4").toDF("col1","col2")
    
    val columnsdf1 = df1.columns
    val columnsdf2 = df2.columns
    
    val joinExprs = columnsdf1
       .zip(columnsdf2)
       .map{case (c1, c2) => df1(c1) === df2(c2)}
       .reduce(_ && _)
    
    val dfJoinRes = df1.join(df2,joinExprs)
    

    【讨论】:

    • val dfJoinRes = df1.join(df2,df1.columns.toSet.intersect(df2.columns.toSet).toSeq, "left") //这段代码在我的情况下也可以工作跨度>
    • 是的,它有效,我想先发布该答案,但我认为它不完整,因为如果 df1 和 df2 中的列具有不同的名称怎么办?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-10-20
    • 2023-03-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多