【问题标题】:How can we pass a variable to where clause in Spark Dataframe我们如何将变量传递给 Spark Dataframe 中的 where 子句
【发布时间】:2020-06-29 03:24:41
【问题描述】:

我正在尝试将变量 SCD_filter 传递给 spark 中数据帧中的 where 子句,我收到错误但直接传递时它工作正常。我这样做是为了根据未来的不同场景动态传递此过滤器使用。

  val SCD_filter = """currentDF.col("u_business_unit") <=> updatedDF.col("u_business_unit")
                     |      and(currentDF.col("u_operation_level_2") <=> updatedDF.col("u_operation_level_2"))
                     |      and(currentDF.col("u_operation_level_3") <=> updatedDF.col("u_operation_level_3"))""".stripMargin

然后我将变量传递给下面的代码:

val common_unchangedata = currentDF.alias("currentDF")
.join(updatedDF, currentDF.col("Sys_id") === updatedDF.col("Sys_id"), "inner")
.select("currentDF.*")
.where(s"$SCD_filter")  /// passing the variable which is causing the error
.show()

收到错误:

Exception in thread "main" org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: Database 'currentdf' not found;

注意:当前的DF很好,因为当变量被删除时代码正在执行,我们将条件传递给变量的where子句。enter image description here

【问题讨论】:

  • 那个变量应该是Column类型
  • 你能发布完整的代码吗?
  • 有示例代码
  • 你是如何创建updatedDF 这段代码在哪里?发布完整的代码,否则很难找到问题

标签: scala apache-spark apache-spark-sql


【解决方案1】:

试试这个 -

val common_unchangedata = currentDF.alias("currentDF")
.join(updatedDF, currentDF.col("Sys_id") === updatedDF.col("Sys_id"), "inner")
.select("*")
.where($"Column1='$data'")  /// passing the variable which is causing the error
.show()

【讨论】:

  • 实际上我必须对整个 where 条件进行参数化,因为它必须是用于加载多个表的通用脚本,其中 where 条件本身对于不同的表会有所不同..条件将是之间的比较两个数据框的列(当前和更新)
  • 列数将是动态的,因此不同表的 where 和 and 子句将发生变化,例如一个表 3 列将出现在 where 而另外 6 列需要进行比较
  • 啊。知道了。不幸的是,.where 或 .filter 不支持 CASE-WHEN 子句,就像它对 .select 所做的那样......你想把这个逻辑提高一层,并根据条件用适当的“硬编码”column_name 定义 DF 吗?
  • 实际上此代码将针对 5 个表运行,并且所有表将具有不同的列,并且所有条件都会不同..我也愿意提供更好的方法
【解决方案2】:

DataFrames 创建别名并在字符串中使用alias.column_name 之类的别名。

val SCD_filter = """
   (
     (currentDF.u_business_unit <=> updatedDF.u_business_unit) and 
     (currentDF.u_operation_level_2 <=> updatedDF.colu_operation_level_2) and 
     (currentDF.u_operation_level_3 <=> updatedDF.u_operation_level_3)
   )
"""
val common_unchangedata = currentDF.alias("currentDF")
.join(updatedDF, currentDF.col("Sys_id") === updatedDF.col("Sys_id"), "inner")
.select("currentDF.*")
.where(SCD_filter)
.show()

【讨论】:

  • 与问题陈述中提到的相同的错误..虽然 currentDF 存在但无法读取它
【解决方案3】:

根据您的评论

实际上,此代码将针对 5 个表运行,并且所有表将具有不同的列,并且所有条件都会不同..我也愿意寻求更好的方法

我会用这样的方法解决这个问题,


import org.apache.spark.sql.functions._

val cond1 = $"u_business_unit"     <=> $"updatedDF.u_business_unit"
val cond2 = $"u_operation_level_2" <=> $"updatedDF.u_operation_level_2"
val cond3 = $"u_operation_level_3" <=> $"updatedDF.u_operation_level_3"

val SCD_filter = cond1.and(cond2).and(cond3)

val common_unchangedata = currentDF
.join(updatedDF, currentDF.col("Sys_id") === updatedDF.col("Sys_id"), "inner")
.where(SCD_filter) 
.show()

您可以将其扩展为具有一个函数 getCondition(tableName:String):Column,它根据您正在使用的数据类型在运行时构造适当的条件。

【讨论】:

    猜你喜欢
    • 2016-08-09
    • 1970-01-01
    • 1970-01-01
    • 2012-10-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-05-11
    • 1970-01-01
    相关资源
    最近更新 更多