【问题标题】:Pyspark trying to write to DB2 table - truncate overwritePyspark 尝试写入 DB2 表 - 截断覆盖
【发布时间】:2021-09-10 18:19:58
【问题描述】:

我正在尝试使用 Pyspark (2.4) 将数据写入 IBM DB2 (10.5 fix pack 11)。 当我尝试执行下面的代码时

df.write.format("jdbc")
.mode('overwrite').option("url",'jdbc:db2://<host>:<port>/<DB>').
option("driver", 'com.ibm.db2.jcc.DB2Driver').
option('sslConnection', 'true')
.option('sslCertLocation','</location/***_ssl.crt?').
option("numPartitions", 1).
option("batchsize", 1000)
.option('truncate','true').
option("dbtable", '<TABLE>').
option("user",'<user>').
option("password", '<PW>')
.save()

作业抛出以下异常:

文件 “/usr/local/Cellar/apache-spark/3.0.1/libexec/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py”,第 326 行,在 get_return_value py4j.protocol.Py4JJavaError 中:一个错误 调用 o97.save 时发生。 : com.ibm.db2.jcc.am.SqlSyntaxErrorException:DB2 SQL 错误: SQLCODE=-104,SQLSTATE=42601, SQLERRMC=END-OF-STATEMENT;ABLE;立即,驱动程序=4.19.80 在 com.ibm.db2.jcc.am.b5.a(b5.java:747)

Job 正在尝试执行截断,但似乎 DB2 期待 ** IMMEDIATE** 关键字

在我上面的代码中,我传递的只是 dbtable 的名称,有没有办法传递 IMMEDIATE 关键字?

同样在 DB2 方面,有没有办法在打开会话时设置它?

仅供参考,我的代码没有截断可以工作,但是删除表并重新创建和加载,我不想在 prod 环境中这样做。

非常感谢您对如何解决此问题的任何想法。

【问题讨论】:

  • 如果 Db2 的 spark 方言当前不会生成 immediate 关键字,请考虑一种解决方法。对于 Db2-LUW,您可以调用存储过程来为您完成工作。如果您的帐户授权正确,您可以使用这样的预先存在的程序:CALL ADMIN_CMD('IMPORT FROM /dev/null OF DEL REPLACE INTO your_schema.your_table_name ')。这是在添加 truncate ... 立即语法之前如何在 Db2-LUW 上执行未记录的空表的方法。

标签: apache-spark jdbc pyspark db2


【解决方案1】:

Spark 2.4 中的DB2Dialect 不会覆盖默认的JDBCDialect 的截断表实现。 the code 中的评论建议重写此方法以返回适合您的数据库引擎的语句。

  /**
   * The SQL query that should be used to truncate a table. Dialects can override this method to
   * return a query that is suitable for a particular database. For PostgreSQL, for instance,
   * a different query is used to prevent "TRUNCATE" affecting other tables.
   * @param table The table to truncate
   * @param cascade Whether or not to cascade the truncation
   * @return The SQL query to use for truncating a table
   */
  @Since("2.4.0")
  def getTruncateQuery(
    table: String,
    cascade: Option[Boolean] = isCascadingTruncateTable): String = {
      s"TRUNCATE TABLE $table"
  } 

也许在 DB2 案例中,您实际上可以扩展 DB2Dialect 本身,添加您的 getTruncateQuery() 实现并定义您的“自定义”JDBC 协议,例如 "jdbc:mydb2"。然后,您可以在 JDBC 连接 URL .option("url",'jdbc:mydb2://&lt;host&gt;:&lt;port&gt;/&lt;DB&gt;') 中使用此协议。

【讨论】:

  • 这意味着我还需要构建和使用我自己版本的 spark 代码库,我们的是 AWS Glue 环境,我将探索可行性,感谢您的时间
  • 不是真的,您可以只构建一个包含您自己的方言类的 jar,然后将它与您的 db2 jdbc 驱动程序一起加载。我希望glue 对引入您自己的非OOTB 数据库有一些支持(尽管我不是aws 专家)。祝你好运!
  • 非常感谢您的提示,并且 Glue 可以添加用户 jar。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-11-10
  • 1970-01-01
  • 1970-01-01
  • 2022-01-18
  • 2019-07-21
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多