【问题标题】:How do I convert table row PCollections to key,value PCollections in Python?如何在 Python 中将表行 PCollections 转换为键、值 PCollections?
【发布时间】:2018-05-14 21:37:42
【问题描述】:

没有关于如何将 pCollections 转换为输入 .CoGroupByKey() 所需的 pCollections 的文档

上下文 本质上,我有两个大的 pCollections,我需要能够找到两者之间的差异,用于 II 型 ETL 更改(如果它在 pColl1 中不存在,则添加到 pColl2 中找到的嵌套字段),以便我能够保留 BigQuery 中这些记录的历史记录。

管道架构:

  1. 将 BQ 表读入 2 个 pCollections:dwsku 和 product。
  2. 对两个集合应用 CoGroupByKey() 以返回 --> 结果
  3. 解析结果以查找 dwsku 中的所有更改并将其嵌套到产品中。

我们会推荐任何帮助。我在 SO 上找到了一个 java 链接,它完成了我需要完成的同样的事情(但 Python SDK 上没有任何内容)。

Convert from PCollection<TableRow> to PCollection<KV<K,V>>

是否有针对 Apache Beam,尤其是 Python SDK 的文档/支持?

【问题讨论】:

    标签: python-3.x google-cloud-dataflow apache-beam dataflow


    【解决方案1】:

    为了使CoGroupByKey() 正常工作,您需要拥有tuples 中的PCollections,其中第一个元素是,第二个元素是数据强>。

    在您的情况下,您说您有 BigQuerySource,在当前版本的 Apache Beam 中输出 PCollection of dictionaries (code),其中每个条目代表已读取的表中的一行。如上所述,您需要将此 PCollections 映射到元组。使用ParDo 很容易做到这一点:

    class MapBigQueryRow(beam.DoFn):
        def process(self, element, key_column):
            key = element.get(key_column)
            yield key, element
    
    
    data1 = (p
                | "Read #1 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #1"))
                | "Map #1 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_1"))
    
    data2 = (p
                | "Read #2 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #2"))
                | "Map #2 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_2"))
    
    co_grouped = ({"data1": data1, "data2": data2} | beam.CoGroupByKey())
    
    # do your processing with co_grouped here
    

    顺便说一句,Apache Beam 的 Python SDK 文档可以在 here 找到。

    【讨论】:

      猜你喜欢
      • 2020-03-29
      • 2018-12-02
      • 1970-01-01
      • 1970-01-01
      • 2018-11-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多