【问题标题】:Batch Insert from Dataframe to DB ignoring failed row in Pyspark从数据框批量插入到数据库,忽略 Pyspark 中的失败行
【发布时间】:2018-07-31 11:38:26
【问题描述】:

我正在尝试使用 JDBC 写入将 spark DF 插入 Postgres。 postgres 表对其中一列有唯一约束,当要插入的 df 违反约束时,整个批次被拒绝并且 spark 会话关闭,给出错误 duplicate key value 违反唯一约束,这是正确的数据重复(已存在于数据库中) org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148

需要插入不违反约束的数据行并忽略失败的行,而不会使整个批处理失败。

使用的代码是:

mode = "Append"
url = "jdbc:postgresql://IP/DB name"
properties = {"user": "username", "password": "password"} 
DF.write
.option("numPartitions",partitions_for_parallelism)
.option("batchsize",batch_size)
.jdbc(url=url, table="table name", mode=mode, properties=properties)

我该怎么做?

【问题讨论】:

  • 如果不将自定义写入数据库作为 forEachPartition 的一部分,则无法跳过失败的批次。如果可以更改表约束,最好删除约束,然后将重复数据删除逻辑作为 SQL 查询的一部分运行。
  • @DavidGreenshtein 您能否说明您将在何处运行重复数据删除?谢谢。我有自己的看法,但对你的很感兴趣。
  • @David Greenshtein:感谢您的建议。在使用 forEachPartition 时出现错误:行类型不可迭代。虽然我可以找到一些使用 scala 的示例,但似乎没有 pyspark 等效代码。

标签: postgresql jdbc pyspark


【解决方案1】:

很遗憾,Spark 没有开箱即用的解决方案。我看到了许多可能的解决方案:

  1. 在 PostgreSQL 数据库中实现冲突解决的业务逻辑,作为 forEachPartition 函数的一部分。例如,捕获约束违反的异常,然后报告到日志中。

  2. 删除 PostgreSQL 数据库上的约束,使用自动生成的 PK 表示启用在数据库中存储重复的行。重复数据删除逻辑可以进一步实现为每个 SQL 查询的一部分,或者每天/每小时运行重复数据删除。您可以查看示例here

  3. 1234563

我希望我的想法会有所帮助。

【讨论】:

  • 那么forEachPartiton 逻辑是做什么的呢?无法从您的第二点进行测量。 @David Greenshtein
  • 如果 PostgreSQL 中定义的约束字段没有很大的基数,想法是根据在 forEachPartition 之前在 PostgreSQL 中定义的约束重新分区数据 -> 准备一个包含相同行的批量约束值 -> 批量写入数据库 -> 如果失败日志并继续下一个批量
  • 我想我需要看看逻辑。我想我明白了,但不是我想我会想出的方法。很有趣。
【解决方案2】:

如果您对目标有唯一的约束,这是不可能的。目前没有使用这些技术的 UPSert 模式。您需要围绕这方面进行设计。

【讨论】:

  • 感谢您的帮助,但基本上我不是在寻找像 Upsert 这样的解决方案,如果记录重复,则无需更新记录。寻找类似 SSIS 所做的事情,将失败的行标记为错误并在批处理中插入所有其他行
  • 我明白了,但它不会飞,除非你像其他人所说的那样做,这与你原来的方法完全不同。所以我看到你认为你应该在写之前检查是否存在。有趣,热衷于记录您的最终解决方案
  • 如果目标很大怎么办?
  • 我不会检查数据行是否存在,因为这将成为性能瓶颈。但仍在寻找平衡的解决方案..希望尽快找到一个
  • 我的观点完全一致。所以我不确定大卫的提议是什么。我的赌注是没有唯一的约束,并且会定期对目标进行重复数据删除。请让我知道你是如何解决的。
猜你喜欢
  • 1970-01-01
  • 2020-10-21
  • 2020-07-18
  • 2012-06-04
  • 1970-01-01
  • 1970-01-01
  • 2021-04-09
  • 1970-01-01
相关资源
最近更新 更多