【问题标题】:Streaming from Pub/Sub to BigQuery从 Pub/Sub 流式传输到 BigQuery
【发布时间】:2018-02-15 11:37:29
【问题描述】:

我正在尝试使用 python 数据流将一些数据从 google PubSub 流式传输到 BigQuery。 出于测试目的,我通过设置将以下代码 https://github.com/GoogleCloudPlatform/DataflowSDK-examples/blob/master/python/dataflow_examples/cookbook/bigquery_schema.py 改编为流式管道

options.view_as(StandardOptions).streaming = True

然后我将 record_ids 管道更改为从 Pub/Sub 读取

# ADDED THIS
lines = p | 'Read PubSub' >> beam.io.ReadStringsFromPubSub(INPUT_TOPIC) | beam.WindowInto(window.FixedWindows(15))
# CHANGED THIS # record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5'])
record_ids = lines | 'Split' >> (beam.FlatMap(split_fn).with_output_types(unicode))
records = record_ids | 'CreateRecords' >> beam.Map(create_random_record)
records | 'Write' >> beam.io.Write(
    beam.io.BigQuerySink(
        OUTPUT,
        schema=table_schema,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))

注意:我已被 google 列入白名单以运行代码(alpha 版)

现在我尝试时出现错误

工作流程失败。原因:(f215df7c8fcdbb00):未知的流接收器:bigquery

你可以在这里找到完整的代码:https://github.com/marcorigodanzo/gcp_streaming_test/blob/master/my_bigquery_schema.py

我认为这与现在的流式管道类型有关,谁能告诉我如何在流式管道中进行 bigQuery 写入?

【问题讨论】:

    标签: python google-bigquery google-cloud-platform google-cloud-dataflow google-cloud-pubsub


    【解决方案1】:

    Beam Python 不支持从流式传输管道写入 BigQuery。现在,您需要使用 Beam Java - 您可以分别使用 PubsubIO.readStrings()BigQueryIO.writeTableRows()

    【讨论】:

    • 好的,谢谢尤金。我希望使用python。你知道这在未来是否会改变吗?您能否请我举一个从 Pub/Sub 读取代码并用 java 写入 BigQuery 的示例?
    • 我相信这个例子同时使用了github.com/apache/beam/blob/master/examples/java8/src/main/java/…。是的,Python 最终会赶上 Java(可能通过 Beam 目前正在开发的可移植性框架,它将允许 Python 管道使用 Java 转换),但我无法预测时间线会是什么。
    猜你喜欢
    • 2017-05-10
    • 1970-01-01
    • 1970-01-01
    • 2018-06-21
    • 2020-06-22
    • 1970-01-01
    • 2017-07-21
    • 1970-01-01
    • 2013-06-20
    相关资源
    最近更新 更多