【问题标题】:Error when performing an inner join on Spark 2.0.1 DataFrame在 Spark 2.0.1 DataFrame 上执行内部联接时出错
【发布时间】:2017-08-12 00:58:34
【问题描述】:

还有其他人遇到这个问题并有解决方法的想法吗?

我一直在尝试更新我的代码以使用 Spark 2.0.1 和 Scala 2.11。在 Spark 1.6.0 和 Scala 2.10 中,一切都很顺利。我有一个直接的数据框到返回错误的数据框内部连接。数据来自 AWS RDS Aurora。请注意,下面的 foo 数据框实际上是 92 列,而不是我显示的两列。即使只有两列,问题仍然存在。

相关信息:

带有架构的数据帧 1

foo.show()

+--------------------+------+
|      Transaction ID|   BIN|
+--------------------+------+
|               bbBW0|134769|
|               CyX50|173622|
+--------------------+------+

println(foo.printSchema())

root
|-- Transaction ID: string (nullable = true)
|-- BIN: string (nullable = true)

带有架构的DataFrame 2

bar.show()

+--------------------+-----------------+-------------------+
|              TranId|       Amount_USD|     Currency_Alpha|
+--------------------+-----------------+-------------------+
|               bbBW0|            10.99|                USD|
|               CyX50|           438.53|                USD|
+--------------------+-----------------+-------------------+

println(bar.printSchema())

root
|-- TranId: string (nullable = true)
|-- Amount_USD: string (nullable = true)
|-- Currency_Alpha: string (nullable = true)

用解释加入数据框

val asdf = foo.join(bar, foo("Transaction ID") === bar("TranId"))
println(foo.join(bar, foo("Transaction ID") === bar("TranId")).explain())

== Physical Plan ==
*BroadcastHashJoin [Transaction ID#0], [TranId#202], Inner, BuildRight
:- *Scan JDBCRelation((SELECT

        ...
        I REMOVED A BUNCH OF LINES FROM THIS PRINT OUT
        ...

      ) as x) [Transaction ID#0,BIN#8] PushedFilters: [IsNotNull(Transaction ID)], ReadSchema: struct<Transaction ID:string,BIN:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
   +- *Filter isnotnull(TranId#202)
      +- InMemoryTableScan [TranId#202, Amount_USD#203, Currency_Alpha#204], [isnotnull(TranId#202)]
         :  +- InMemoryRelation [TranId#202, Amount_USD#203, Currency_Alpha#204], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         :     :  +- Scan ExistingRDD[TranId#202,Amount_USD#203,Currency_Alpha#204]

现在我得到的错误是:

16/10/18 11:36:50 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 6)
java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'ID IS NOT NULL)' at line 54

完整的堆栈可以在这里看到 (http://pastebin.com/C9bg2HFt)

在我的代码或 jdbc 查询中,从数据库中提取数据的任何地方都没有 ID IS NOT NULL)。我花了很多时间在谷歌上搜索,发现了一个提交给 Spark 的提交,它在查询计划中添加了空过滤器以进行连接。这是提交 (https://git1-us-west.apache.org/repos/asf?p=spark.git;a=commit;h=ef770031)

【问题讨论】:

    标签: scala apache-spark spark-dataframe


    【解决方案1】:

    好奇你是否尝试过以下方法;

    val dfRenamed = bar.withColumnRenamed("TranId", " Transaction ID")
    val newDF = foo.join(dfRenamed, Seq("Transaction ID"), "inner")
    

    【讨论】:

      猜你喜欢
      • 2017-10-27
      • 2020-11-27
      • 1970-01-01
      • 2013-03-12
      • 2020-10-07
      • 1970-01-01
      • 1970-01-01
      • 2017-02-19
      • 1970-01-01
      相关资源
      最近更新 更多