【问题标题】:Spark Scala dataframe join dynamically using list of columns and joinExprs使用列列表和 joinExprs 动态连接 Spark Scala 数据帧
【发布时间】:2021-10-20 10:58:03
【问题描述】:

我正在创建一个函数,它将连接键和条件作为参数并动态连接两个数据帧。

我了解 Spark Scala Dataframe join done the following ways:

1) join(right: Dataset[_]): DataFrame
2) join(right: Dataset[_], usingColumn: String): DataFrame
3) join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
4) join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame
5) join(right: Dataset[_], joinExprs: Column): DataFrame
6) join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame

连接键/usingColumns 参数将是列名列表。 condition/joinExprs - 不知道如何传递它,但它可以是像"df2(colname) == 'xyz'"这样的字符串

Based on this post,我想出了以下内容。它负责连接键列表,但我怎样才能添加条件呢? (注意:为简单起见,我在这里使用了相同的数据框)

 %scala
  val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
    (2,"Rose",1,"2010","20","M",4000),
  )
  val empColumns = Seq("emp_id","name","superior_emp_id","year_joined","dept_id","gender","salary")
  import spark.sqlContext.implicits._
  val empDF = emp.toDF(empColumns:_*) 
  val empDF2 = emp.toDF(empColumns:_*) 


val join_keys = Seq("emp_id","name") // this will be a parameter
val joinExprs = join_keys.map{case (c1) => empDF(c1) === empDF2(c1)}.reduce(_ && _) 

// How do I add to joinExprs, another joinExpr like "empDF2(dept_id) == 10" here?

empDF.join(empDF2,joinExprs,"inner").show(false)

【问题讨论】:

    标签: scala dataframe apache-spark join


    【解决方案1】:

    您可以使用&& 附加到joinExprs

    empDF.join(empDF2,joinExprs && empDF2("dept_id") === 10,"inner").show(false)
    

    【讨论】:

    • 谢谢拉斐尔。会尝试的。如果我想发送 joinExprs && "empDF2("dept_id") === 10" 作为参数,那会是什么类型。
    • 它的类型应该是Column
    • 这样val joinExprCol = empDF2("dept_id") === 10会不会有点奇怪
    猜你喜欢
    • 2019-04-02
    • 2021-06-02
    • 2018-10-25
    • 2017-09-04
    • 1970-01-01
    • 1970-01-01
    • 2020-03-21
    • 2021-09-02
    • 1970-01-01
    相关资源
    最近更新 更多