【问题标题】:Reading data from multiple pubusb to same bigquery从多个 pubusb 读取数据到同一个 bigquery
【发布时间】:2020-03-27 19:32:01
【问题描述】:

这个问题与理解在 apache beam 中连接 gcp 管道的语法有关。这是我当前管道的设置方式

options = dataflow_options(project_id=project_id, topic_name=topic_name, job_name=job_name)

p = apache_beam.Pipeline(options=options)

(p
    | 'read pubusb' >> apache_beam.io.ReadFromPubSub(topic=topic_path, with_attributes=True)
    | 'decode the message' >> apache_beam.ParDo(mydecoder())
    | 'persist to db' >> apache_beam.io.WriteToBigQuery(
            output_table,
            create_disposition=apache_beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=apache_beam.io.BigQueryDisposition.WRITE_APPEND
))

p.run()

有了这个,我可以创建一个看起来像这样的管道。

现在我真正想做的(假设我的解码器是相同的)是将多个 pubsub 连接到同一个解码器,即

我如何在 Apache Beam 中实现这一点

我忘了说几件事

  1. 所有主题基本上都是字节流。
  2. 从主题中读取数据时,数据之间没有公用键
  3. 每个主题都有不同的解码逻辑

我正在查看CoGroupby,但它需要一个公用密钥。

【问题讨论】:

  • 您可以使用flatten 将多个PCollection 合并为一个。但是你想如何决定哪个解码器用于哪个消息(你提到每个主题的解码逻辑不同)。

标签: python-3.x google-cloud-dataflow apache-beam


【解决方案1】:

使用flatten 将多个PCollection 合并为一个:

# Flatten takes a tuple of PCollection objects.
# Returns a single PCollection that contains all of the elements in the PCollection objects in that tuple.

 merged = (
     (pcoll1, pcoll2, pcoll3)
     # A list of tuples can be "piped" directly into a Flatten transform.
     | beam.Flatten())

【讨论】:

  • 嘿 - 当我尝试使用 Flatten 时,我遇到了一个新问题。我在这里发布了问题:stackoverflow.com/questions/60937654/…
  • 你的其他帖子好像没了。你的问题解决了吗?
猜你喜欢
  • 1970-01-01
  • 2021-12-07
  • 1970-01-01
  • 1970-01-01
  • 2014-07-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多