【问题标题】:Merge PCollections in Google Dataflow在 Google Dataflow 中合并 PCollection
【发布时间】:2017-05-12 14:15:48
【问题描述】:

我是 Google Dataflow 的新手,我正在处理想要加入不同语言目录的目录数据。现在,我的进程自己解析这两种语言,并为每种语言创建一个 PCollection。此 PCollection 包含一个元素,该元素包含元数据、所有文章和所有类别。该模型已准备好容纳多种语言,我现在需要做的就是合并这 2 个元素(PCollections)。

但是.... Google 数据流文档没有提供有关如何“手动”合并这些集合的示例,并且应用上的输入始终需要特定的输入类型。 我尝试了 PCollectionList,但找不到可以处理所有包含 PCollection 及其实体的解决方案。然后我写了一个 Custom CombineFn 但我不知道如何应用它,以便将两个 Collections 合并。

所以我有 2 个 PCollection catalogCollection_de 和 catalogCollection_fr。如何处理这些以自定义加入它们并获得单个 PCollection?

非常感谢任何帮助

编辑:关于“合并/加入”在这种情况下意味着什么的问题。 PCollections 对象拥有一个名为 CatalogCollection 的 Pojo,其中包含元数据、类别和文章。这些 Pojo 中的每一个都有不同的文本,其中每个文本由 Map<LanguageKey, Description> 表示。对于每种语言,都有一个保存此 CatalogCollection 的 PCollection。当我想加入/合并它们时,我想将所有语言收集在一个 PCollection 中,这意味着,例如,对于每篇文章,我需要找到另一种语言的相应文章,并将所有描述放在第一个收集,最终应该收集所有语言。

【问题讨论】:

  • 您能否详细说明合并在这里的实际作用?一般来说,拥有单个元素的 PCollection 不会是可并行化的。拥有文章的 PCollection、类别的 PCollection 等可能会更好。每篇文章都可以通过它所属的语言等来键入。
  • Ben 所说的 +1,在这种情况下,您将需要 CoGroupByKey 转换来按键加入集合。 (也许 join 就是你所说的合并?)
  • 更新问题
  • Dataflow 并行执行 PCollection 中的元素。如果您有一个带有单个对象的 PCollection,它将不会并行执行,并且不会给您带来太多好处。与其拥有一个对象,不如拥有PCollection<KV<LanguageKey, Description>> 等。现在有很多元素。要对齐文章,您需要运行 DoFn 来创建 PCollection<KV<ArticleId, Article>>,然后您可以运行 GroupByKeyCoGroupByKey 来匹配文章。
  • 当我处于试验阶段时,我设法将我的 PCollections 与带有 sideInput 的自定义 DoFn “合并”,我可以在其中访问两个 POJO。现在我的自定义“合并”有效。在下一次迭代中,我将尝试使用 PCollections of Articles/Categories 等,Dataflow 的并行化可以做到这一点。 Dataflow 的主要因素是对 Google Datastore 的快速“读写”。非常感谢您的输入@Ben Chambers!当我进行下一次迭代时,也许我会回到那个线程并更新它

标签: merge google-cloud-dataflow dataflow


【解决方案1】:

您要查找的转换是 Flatten:

PCollection<String> pc1 = ...;
PCollection<String> pc2 = ...;
PCollection<String> pc3 = ...;
PCollectionList<String> pcs = PCollectionList.of(pc1).and(pc2).and(pc3);
PCollection<String> merged = pcs.apply(Flatten.<String>pCollections());

另请注意:我注意到您说“此集合包含一个元素”,这表明您也可能遇到问题,因为 Dataflow 将元素分布在工作人员之间以进行并行处理。仅使用元素,您可能无法获得预期的并行性优势。

【讨论】:

  • 但我似乎无法自定义展平操作。数据流如何知道该做什么?
  • 在将结果解析为 Json-String 后,它似乎只是连接了两个集合。我需要合并它们
  • 这里的“合并”是什么意思?
【解决方案2】:

对于我的用例,似乎要走的路是使用带有侧输入的 ParDo 并手动加入 PoJos(由于我的数据结构)。还有一个更新的管道,其中 PCollections 被分解为文章/类别等。我必须自定义合并它们。 现在的问题是,从 Datastore 读取将不起作用,但那是在另一个线程中。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-01
    相关资源
    最近更新 更多