【问题标题】:How to execute an update query in spark through JDBC如何通过 JDBC 在 Spark 中执行更新查询
【发布时间】:2017-11-27 06:43:59
【问题描述】:

我想使用 JDBC 对 spark 中的 sqlserver 数据执行所有 dml 操作,但是在执行 UPDATE 查询时遇到了问题。 下面是用于获取连接的代码和用于执行的查询,以及执行 UPDATE 查询时出现的异常。 任何有关如何克服此问题的帮助或任何指示都会有很大帮助。 提前致谢。

val jdbcDbTable = "dbName"

val jdbcSqlConnStr = "jdbc:sqlserver://xxxx:portno;" + "user=xx;password=xxx;"
val jdbcDF = sqlContext.read.format("jdbc")
  .options(Map("driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
               "url" -> jdbcSqlConnStr,
               "dbtable" -> jdbcDbTable))
  .load()
jdbcDF.registerTempTable("customer1")

val cust = sqlContext.sql("Select * from customer1")
cust.show()

问题是,select * 语句返回正确的结果,但是当我执行这样的更新语句时:

val upd = sqlContext.sql("update customer1  set C_NAME='newcustomer' " +
        " where C_CustKey=1471774")
upd.show()

我得到错误:

Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
mismatched input 'update' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 1, pos 0)

== SQL ==
update customer1  set C_NAME='newcustomer'  where C_CustKey=1471774
^^^

【问题讨论】:

    标签: scala apache-spark jdbc


    【解决方案1】:

    您不是通过 JDBC 更新表,而是尝试更新 spark 目录表。 Spark 不支持更新语句。

    您可以通过 JDBC 连接(使用 Spark 或 Plain JDBC)执行查询,然后取回数据帧。

    【讨论】:

      猜你喜欢
      • 2023-01-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-12-26
      • 2020-01-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多