【问题标题】:Google Dataflow: Write to Datastore without overwriting existing entitiesGoogle Dataflow:写入数据存储区而不覆盖现有实体
【发布时间】:2017-06-09 15:54:51
【问题描述】:

TLDR:寻找一种在不通过 Dataflow 覆盖现有数据的情况下更新 Datastore 实体的方法

我正在使用 dataflow 2.0.0 (beam) 来更新 Google Datastore 中的实体。我的数据流从数据存储加载实体,更新它们,然后将它们保存回数据存储(覆盖现有实体)。

但是,在更新过程中,我还发现了可能已经存在或不存在的其他实体。为了防止覆盖现有实体,我之前会从 Datastore 加载所有实体并减少它们(按键分组),删除新的重复项。

随着实体数量的增加,我希望避免将所有实体加载到 Dataflow 中(而不是根据最旧的时间戳分批获取它们),但我遇到了旧实体在被覆盖时被覆盖的问题不在当前批次中。


我正在使用(在两个地方,一个用于现有实体,一个用于新实体)将实体写入 Dataflow:

collection.apply(DatastoreIO.v1().write().withProjectId("..."))

如果有类似DatastoreIO.v1().writeNew() 的方法,那就太好了,但遗憾的是它不存在。感谢您的帮助。

【问题讨论】:

  • 我不清楚您要做什么,因为您说您要覆盖实体并防止覆盖实体。你能澄清一下吗?另外,您是如何批量加载实体的?
  • 是的,我们完全应该通过 Dataflow 公开它。该服务支持 Insert、Upsert 和 Update - 但我们目前仅在 Dataflow 中公开 Upsert。
  • 非幂等写入的语义需要额外考虑如何处理重试,因为 Dataflow 会继续重试,直到成功写入每条记录。我们最终可能会遇到一些写入成功但 Dataflow 作业失败的情况,并且该作业的任何未来重试都不会成功。
  • @VikasKedigehalli 基本上通过选择早于 X 时间的实体或选择 Y 最旧的实体来进行批处理。是的,我基本上想做实体更新而不是覆盖/upserts
  • @MingweiSamuel 你能澄清一下“更新数据存储实体而不覆盖现有数据”的意思吗?根据 Datastore 文档cloud.google.com/datastore/docs/concepts/…,更新实体基本上是一种覆盖。

标签: java google-cloud-datastore google-cloud-dataflow apache-beam


【解决方案1】:

如果您想编写一个 Datastore 上不存在的新实体,只需使用新密钥创建一个并写入即可。

List<String> keyNames = Arrays.asList("L1", "L2"); // Somewhat you have new keys to store
PTransform<PCollection<Entity>, ?> write =
        DatastoreIO.v1().write().withProjectId(project_id); // This is a typical write operation

p.
    apply("GetInMemory", Create.of(keyNames)).setCoder(StringUtf8Coder.of()). // L1 and L2 are loaded
    apply("Proc1", ParDo.of(new DoFn<String, Entity>(){
        @ProcessElement
        public void processElement(ProcessContext c) {
            Key.Builder key = makeKey("k2", c.element());  // Generate an entity key
            final Entity entity = Entity.newBuilder().
                    setKey(key). // Set the key
                    putProperties("p1", makeValue(new String("test constant value")
                        ).setExcludeFromIndexes(true).build()).
                    build();
            c.output(entity);
        }
    })).
    apply(write); // Write them
p.run();

可以在我的代码库https://github.com/yiu31802/gcp-project/commit/cc224b34中引用整个代码

【讨论】:

  • 有没有办法在创建新实体时创建自动分配的键/ID?
猜你喜欢
  • 2019-02-17
  • 1970-01-01
  • 2015-10-20
  • 2012-04-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-11-04
  • 1970-01-01
相关资源
最近更新 更多