【问题标题】:Cloud Dataflow/Beam - PCollection lookup another PCollectionCloud Dataflow/Beam - PCollection 查找另一个 PCollection
【发布时间】:2019-02-09 15:39:22
【问题描述】:

a) 从有界源读取数据,在 Dataflow 中运行时 PCollection 的大小可以有多大? b) 在处理大数据时,假设大约 5000 万个 PCollection 数据试图查找另一个包含大约 1000 万个 PCollection 数据的 PCollection。可以做到吗,Beam/Dataflow 的性能如何?在 ParDo 函数中,假设我们只能传递一个输入并返回一个输出,那么如何基于 2 个输入数据集执行查找?我正在尝试查看类似于任何其他 ETL 工具的 Dataflow/Beam,在其中可以轻松查找以创建新的 PCollection。请提供任何代码 sn-ps,这可能会有所帮助。

我也看到了侧输入功能,但是侧输入真的可以容纳这么大的数据集吗,如果这样可以完成查找吗?

【问题讨论】:

    标签: google-cloud-dataflow apache-beam dataflow


    【解决方案1】:

    您绝对可以使用侧输入来做到这一点,因为侧输入可能是任意大的。

    在 Java 中你会做这样的事情:

    Pipeline pipeline = Pipeline.create(options);
    PCollectionView<Map<...>> lookupCollection = pipeline
       .apply(new ReadMyLookupCollection())
       .apply(View.asMap());
    
    
    PCollection<..> mainCollection = pipeline
        .apply(new ReadMyPCollection())
        .apply(
            ParDo.of(new JoinPCollsDoFn()).withSideInputs(lookupCollection));
    
    class JoinPCollsDoFn<...> extends DoFn<...> {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Map<...> siMap = c.sideInput(lookupCollection);
        String lookupKey = c.element().lookupKey;
        AugmentedElement result = c.element().mergeWith(siMap.get(lookupKey))
        c.output(result);
      }
    }
    

    FWIW,这有点伪代码,但它是您想要做的事情的 sn-p。如果您想进一步澄清,请告诉我。

    【讨论】:

    • 谢谢,如果我没记错的话,根据文档,侧面输入存在/复制到每个工人。因此,我认为我应该提供更大的缓存大小,以防将大型数据集作为侧输入。
    • 侧边输入可用,但在worker中不一定加载到内存中。每个跑步者以不同的方式实现侧面输入,但无论如何它都不应该对您的工作人员造成巨大的内存负担。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-06-24
    • 1970-01-01
    • 1970-01-01
    • 2022-12-31
    • 1970-01-01
    相关资源
    最近更新 更多