【发布时间】:2020-06-29 03:24:41
【问题描述】:
我正在尝试将变量 SCD_filter 传递给 spark 中数据帧中的 where 子句,我收到错误但直接传递时它工作正常。我这样做是为了根据未来的不同场景动态传递此过滤器使用。
val SCD_filter = """currentDF.col("u_business_unit") <=> updatedDF.col("u_business_unit")
| and(currentDF.col("u_operation_level_2") <=> updatedDF.col("u_operation_level_2"))
| and(currentDF.col("u_operation_level_3") <=> updatedDF.col("u_operation_level_3"))""".stripMargin
然后我将变量传递给下面的代码:
val common_unchangedata = currentDF.alias("currentDF")
.join(updatedDF, currentDF.col("Sys_id") === updatedDF.col("Sys_id"), "inner")
.select("currentDF.*")
.where(s"$SCD_filter") /// passing the variable which is causing the error
.show()
收到错误:
Exception in thread "main" org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: Database 'currentdf' not found;
注意:当前的DF很好,因为当变量被删除时代码正在执行,我们将条件传递给变量的where子句。enter image description here
【问题讨论】:
-
那个变量应该是
Column类型 -
你能发布完整的代码吗?
-
有示例代码
-
你是如何创建
updatedDF这段代码在哪里?发布完整的代码,否则很难找到问题
标签: scala apache-spark apache-spark-sql