【问题标题】:Can datastore input in google dataflow pipeline be processed in a batch of N entries at a time?谷歌数据流管道中的数据存储输入是否可以一次处理一批 N 个条目?
【发布时间】:2016-05-06 01:01:04
【问题描述】:

我正在尝试执行一个数据流管道作业,该作业将在数据存储中的 N 个条目上执行一个函数。在我的例子中,这个函数将 100 个条目作为有效负载发送到某个 REST 服务。这意味着我想遍历一个数据存储实体中的所有条目,并一次将 100 个批处理条目发送到一些外部 REST 服务。

我目前的解决方案

  1. 从数据存储读取输入
  2. 创建与管道选项中指定的工作人员一样多的密钥(1 个工作人员 = 1 个密钥)。
  3. 按键分组,这样我们得到迭代器作为输出(第4步中的迭代器输入)
  4. 以编程方式对临时列表中的用户进行批处理,并将它们作为批处理发送到 REST 端点。

上面描述的伪代码场景(忽略细节):

final int BATCH_SIZE = 100;

// 1. Read input from datastore
pipeline.apply(DatastoreIO.readFrom(datasetId, query))

    // 2. create keys to be used in group by so we get iterator in next task
    .apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            String key = generateKey(c);
            EntryPOJO entry = processEntity(c);
            c.output(KV.of(key, entry));
        }
    }))

    // 3. Group by key
    .apply(GroupByKey.create())

    // 4. Programatically batch users
    .apply(ParDo.of(new DoFn<KV<String, Iterable<EntryPOJO>>() {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            List<EntryPOJO> batchedEntries = new ArrayList<>();
            for (EntryPOJO entry : c.element().getValue()) {
                if (batchedEntries.size() >= BATCH_SIZE) {
                    sendToRESTEndpoint(batchedEntries);
                    batchedEntries = new ArrayList<>();
                }
                batchedEntries.add(entry);
            }
            sendToRESTEndpoint(batchedEntries);
        }
    }));

我当前解决方案的主要问题

GroupByKey 阻止执行最后一个 ParDo(阻止第 4 步),直到所有条目都分配给一个键。

解决方案通常有效,但我想并行执行所有操作(在从数据存储加载后立即将 100 个条目发送到 REST 端点),这在我当前的解决方案中是不可能的,因为 GroupByKey 在从数据库中获取每个条目并将其插入键值对之前不会输出任何数据。所以执行实际上分为两步:1. 从数据存储中获取所有数据并为其分配一个键,2. 批量处理条目

问题

所以我想知道是否有一些现有的功能可以做到这一点。或者至少在没有 GroupByKey 步骤的情况下获得 Iterable,这样批处理功能任务就不需要等待数据被转储。

【问题讨论】:

标签: google-cloud-dataflow google-cloud-datastore gcloud dataflow


【解决方案1】:

您可以在 DoFn 中批量处理这些元素。例如:

final int BATCH_SIZE = 100;

pipeline
  // 1. Read input from datastore  
  .apply(DatastoreIO.readFrom(datasetId, query))

  // 2. Programatically batch users
  .apply(ParDo.of(new DoFn<DatastoreV1.Entity, Iterable<EntryPOJO>>() {

    private final List<EntryPOJO> accumulator = new ArrayList<>(BATCH_SIZE);

    @Override
    public void processElement(ProcessContext c) throws Exception {
      EntryPOJO entry = processEntity(c);
      accumulator.add(c);
      if (accumulator.size() >= BATCH_SIZE) {
        c.output(accumulator);
        accumulator = new ArrayList<>(BATCH_SIZE);
      }
    }

    @Override
    public void finishBundle(Context c) throws Exception {
      if (accumulator.size() > 0) {
        c.output(accumulator);
      }
    }
  });

  // 3. Consume those bundles
  .apply(ParDo.of(new DoFn<Iterable<EntryPOJO>, Object>() {
    @Override
    public void processElement(ProcessContext c) throws Exception {
        sendToRESTEndpoint(batchedEntries);
    }
  }));

如果您不想要单独的“批处理”步骤,也可以将步骤 2 和 3 合并到一个 DoFn 中。

【讨论】:

  • 正是我想要的,解决了我的问题。感谢您的快速回复。起初我试图以这种方式实现它,但我忽略了“finishBundle”方法,如果你想处理剩余在累加器中的元素,这是至关重要的。
  • 您还应该重置finishBundle(或至少startBundle)中的列表,因为可以重复使用DoFns(请参阅issues.apache.org/jira/browse/BEAM-38
  • 如何使用 Python SDK 复制同样的行为?
猜你喜欢
  • 2017-05-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-07-27
  • 1970-01-01
  • 2016-10-15
  • 1970-01-01
  • 2018-10-19
相关资源
最近更新 更多