【发布时间】:2017-06-09 15:54:51
【问题描述】:
TLDR:寻找一种在不通过 Dataflow 覆盖现有数据的情况下更新 Datastore 实体的方法
我正在使用 dataflow 2.0.0 (beam) 来更新 Google Datastore 中的实体。我的数据流从数据存储加载实体,更新它们,然后将它们保存回数据存储(覆盖现有实体)。
但是,在更新过程中,我还发现了可能已经存在或不存在的其他实体。为了防止覆盖现有实体,我之前会从 Datastore 加载所有实体并减少它们(按键分组),删除新的重复项。
随着实体数量的增加,我希望避免将所有实体加载到 Dataflow 中(而不是根据最旧的时间戳分批获取它们),但我遇到了旧实体在被覆盖时被覆盖的问题不在当前批次中。
我正在使用(在两个地方,一个用于现有实体,一个用于新实体)将实体写入 Dataflow:
collection.apply(DatastoreIO.v1().write().withProjectId("..."))
如果有类似DatastoreIO.v1().writeNew() 的方法,那就太好了,但遗憾的是它不存在。感谢您的帮助。
【问题讨论】:
-
我不清楚您要做什么,因为您说您要覆盖实体并防止覆盖实体。你能澄清一下吗?另外,您是如何批量加载实体的?
-
是的,我们完全应该通过 Dataflow 公开它。该服务支持 Insert、Upsert 和 Update - 但我们目前仅在 Dataflow 中公开 Upsert。
-
非幂等写入的语义需要额外考虑如何处理重试,因为 Dataflow 会继续重试,直到成功写入每条记录。我们最终可能会遇到一些写入成功但 Dataflow 作业失败的情况,并且该作业的任何未来重试都不会成功。
-
@VikasKedigehalli 基本上通过选择早于 X 时间的实体或选择 Y 最旧的实体来进行批处理。是的,我基本上想做实体更新而不是覆盖/upserts
-
@MingweiSamuel 你能澄清一下“更新数据存储实体而不覆盖现有数据”的意思吗?根据 Datastore 文档cloud.google.com/datastore/docs/concepts/…,更新实体基本上是一种覆盖。
标签: java google-cloud-datastore google-cloud-dataflow apache-beam