【问题标题】:How to combine multiple PCollections together and give it as input to a ParDo function如何将多个 PCollection 组合在一起并将其作为 ParDo 函数的输入
【发布时间】:2015-10-13 10:37:52
【问题描述】:

我有六个 PCollection 作为 KV。我想通过将组合的 (6) PCollection 作为 sideInput 对另一个 PCollection 执行 ParDo。

我尝试将所有 6 个 PCollection 作为单独的 sideInput 提供,如下所示

PCollection<TableRow> OutputRows = MyCollection.apply(ParDo.withSideInputs(Inp1, Inp2,...)
    .of(new DoFn<KV<String, String>, TableRow>() {
        ...
    }

但是当堆空间超过时它会抛出 OutOfMemoryError 。请就如何组合 PCollection 以作为另一个 PCollection 的输入提供建议。

【问题讨论】:

    标签: google-cloud-platform google-cloud-dataflow


    【解决方案1】:

    Cloud Dataflow 提供了多种加入方式。

    PCollections 用作侧输入被广播到工作人员并加载到内存中。这听起来像你在做什么,如果 PCollection 大小的总和太大,就会解释 OOM。

    您提到值是键控的——另一种选择是使用CoGroupByKey

    为此,您将使用所有PCollections 创建一个KeyedPCollectionTuple,然后您将获得一个包含每个键的所有值的结果。使用像这样的CoGroupByKey 会打乱您的数据,以便使用给定键的结果的 ParDo 只需要读入关联的值:

    PCollection<KV<K, V1>> inp1 = ...;
    PCollection<KV<K, V2>> inp2 = ...;
    
    final  TupleTag<V1> t1 = new  TupleTag<>();
    final  TupleTag<V2> t2 = new  TupleTag<>();
    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
      KeyedPCollectionTuple.of(t1, inp1)
                           .and(t2, inp2)
                           .apply(CoGroupByKey.<K>create());
    
    PCollection<T> finalResultCollection =
      coGbkResultCollection.apply(ParDo.of(
       new  DoFn<KV<K, CoGbkResult>, T>() {
         @Override
         public void processElement(ProcessContext c) {
          KV<K, CoGbkResult> e = c.element();
          Iterable<V1> pt1Vals = e.getValue().getAll(t1);
          V2 pt2Val = e.getValue().getOnly(t2);
          ... Do Something ....
         c.output(...some T...);
       }
     }));
    

    【讨论】:

      猜你喜欢
      • 2011-08-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-10-04
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多