【问题标题】:flexible join condition in Spark (Scala)Spark(Scala)中的灵活连接条件
【发布时间】:2019-03-11 17:43:56
【问题描述】:

我想要一个灵活的连接条件,例如可以作为字符串传递(或任何其他建议?)。例如,在下面的语句中,FLEXIBLE_CONDITION 表达式可以在不同的运行中改变。

val df3 = df1.join(df2, FLEXIBLE_CONDITION, "fullouter")

几个例子:

 (1) df1(s"query") === df2 (s"query_df2") 
 (2) df1(s"id") === df2(s"id_df2") && df1(s"item") === df2(s"item_df2")
 (3) Or combination of (1) and (2) or any other condition

需要注意的是,根据它们进行join的列名是不同的。例如,在 (1) 中,df1 中的列名是 query,而 df2 中的列名是 query_df2,依此类推。

FLEXIBLE_CONDITION 不应该是硬编码的,但可以是一个输入,并且可能会经常更改。或者可以基于一组输入(例如列名)自动化。

【问题讨论】:

    标签: scala apache-spark join


    【解决方案1】:

    我想通了。这就是我要找的:

     val first :  String = unique_attrs(0)
     var expression : org.apache.spark.sql.Column = df1(first) === df2_r(s"$first" + "_df2")
     for (i <- 1 to unique_attrs.length - 1) {
       val attr : String = unique_attrs(1)
       expression = expression && df1(attr) === df2_r(s"$attr" + "_df2")
     }
    
     val df3 = df1.join(df2_r, expression, "fullouter")
    

    属性列表作为方法的输入(unique_attrs)给出。

    【讨论】:

      【解决方案2】:

      你可以提供表达式,应该在join中使用

      为此签名

      def join(right: Dataset[_], joinExprs: Column): DataFrame
      

      例如,

      val df1 = Seq(
          ("a1", "b1"),
          ("a2", "b2")
      ).toDF("a", "b")
      
      val df2 = Seq(
          ("b1", "a1"),
          ("b2", "a2")
      ).toDF("b1", "a1")
      
      df1.show
      df2.show
      

      输出

      +---+---+
      |  a|  b|
      +---+---+
      | a1| b1|
      | a2| b2|
      +---+---+
      
      +---+---+
      | b1| a1|
      +---+---+
      | b1| a1|
      | b2| a2|
      +---+---+
      

      您可以构建任何您希望 ant 提供加入的表达式

      val expression = df1("a") === df2("a1")
      val result = df1 join (df2, expression)
      
      result.show
      

      输出

      +---+---+---+---+
      |  a|  b| b1| a1|
      +---+---+---+---+
      | a1| b1| b1| a1|
      | a2| b2| b2| a2|
      +---+---+---+---+
      

      更新:

      您可以使用createOrReplaceTempView 例如

      df1.createOrReplaceTempView("df1")
      df2.createOrReplaceTempView("df2")
      
      val res = spark.sql("select * from df1 inner join df2 on df1.a == df2.a1")
      res.show
      

      输出

      +---+---+---+---+
      |  a|  b| b1| a1|
      +---+---+---+---+
      | a1| b1| b1| a1|
      | a2| b2| b2| a2|
      +---+---+---+---+
      

      结果是一样的,你可以提供 sql 查询作为字符串

      【讨论】:

      • 感谢您的评论。在这里再次使用表达式需要对条件进行硬编码。条件可以是输入,例如字符串或任何其他建议?
      • @Xan 我更新了我的解决方案,也许这是你需要的
      猜你喜欢
      • 2021-09-22
      • 1970-01-01
      • 2017-04-10
      • 2023-03-16
      • 1970-01-01
      • 1970-01-01
      • 2021-03-22
      • 1970-01-01
      • 2018-01-03
      相关资源
      最近更新 更多