【发布时间】:2018-07-31 11:38:26
【问题描述】:
我正在尝试使用 JDBC 写入将 spark DF 插入 Postgres。 postgres 表对其中一列有唯一约束,当要插入的 df 违反约束时,整个批次被拒绝并且 spark 会话关闭,给出错误 duplicate key value 违反唯一约束,这是正确的数据重复(已存在于数据库中) org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148
需要插入不违反约束的数据行并忽略失败的行,而不会使整个批处理失败。
使用的代码是:
mode = "Append"
url = "jdbc:postgresql://IP/DB name"
properties = {"user": "username", "password": "password"}
DF.write
.option("numPartitions",partitions_for_parallelism)
.option("batchsize",batch_size)
.jdbc(url=url, table="table name", mode=mode, properties=properties)
我该怎么做?
【问题讨论】:
-
如果不将自定义写入数据库作为 forEachPartition 的一部分,则无法跳过失败的批次。如果可以更改表约束,最好删除约束,然后将重复数据删除逻辑作为 SQL 查询的一部分运行。
-
@DavidGreenshtein 您能否说明您将在何处运行重复数据删除?谢谢。我有自己的看法,但对你的很感兴趣。
-
@David Greenshtein:感谢您的建议。在使用 forEachPartition 时出现错误:行类型不可迭代。虽然我可以找到一些使用 scala 的示例,但似乎没有 pyspark 等效代码。
标签: postgresql jdbc pyspark