【问题标题】:Capturing failures when writing to BigQuery in Dataflow pipeline在 Dataflow 管道中写入 BigQuery 时捕获失败
【发布时间】:2018-10-18 05:15:28
【问题描述】:

我有一个从 PubSub 主题读取事件数据的数据流管道。收到消息时,我会执行转换步骤,以使事件数据适合我所需的 BigQuery 架构。但是,如果我创建的输入不符合模式,我就会遇到问题。显然它会无限重试写入 BigQuery:

Count: 76   RuntimeError: Could not successfully insert rows to BigQuery table

目前我正在做很多手动检查输入是否符合架构,但是,如果我没有考虑,我会累积 RuntimeErrors。有没有办法尝试写入 BigQuery,以防无法使用原始输入执行其他操作?或者,有没有办法尝试多次写入,否则会在不添加新的 RuntimeErrors 的情况下静默失败?

编辑:我正在使用 python SDK。这是我的简化管道以进一步澄清:

with beam.Pipeline(options=options) as pipeline:
    # Read messages from PubSub
    event = (pipeline
             | 'Read from PubSub' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(topic))

    output = (event
              | 'Create output' >> beam.transforms.core.FlatMap(lambda event: [{'input': event}]))

    # Write to Big Query
    _ = (output
         | 'Write log to BigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
             table=table,
             dataset=dataset,
             project=project,
             schema=schema,
             create_disposition=beam.io.gcp.bigquery.BigQueryDisposition.CREATE_NEVER,
             write_disposition=beam.io.gcp.bigquery.BigQueryDisposition.WRITE_APPEND))

如果我的表中没有“输入”列,则作业将终止。在查看https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1279 之后,似乎这就是这种行为的原因。通过自定义https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1187 并且不引发 RuntimeError,我可以克服我的问题,但是,这感觉很麻烦。有人对更简单的方法有建议吗?

【问题讨论】:

  • 您是否运行了模板数据流管道之一来执行此操作?是否有关于该错误的更多信息?
  • 我在 python 中创建了一个自定义数据流管道。对于适合架构的有效负载,作业运行良好。自定义验证过滤问题列表,将其写入单独的调试表,这也可以正常工作。但是,在我没有考虑到的情况下,这份工作就会消失。我使用简化的管道编辑了问题以提供更多详细信息。
  • 与您的情况相关的公共功能请求已打开。你可以在这里关注它 - issuetracker.google.com/issues/110334821
  • 哦,太好了,我真的很感激!

标签: python google-bigquery google-cloud-dataflow google-cloud-pubsub


【解决方案1】:

如果您自己编写了管道,您应该能够在 BigQueryIO 上使用 setFailedInsertRetryPolicyInsertRetryPolicy.neverRetry

【讨论】:

  • 您好,谢谢您的回答。但是,我使用的是 python,找不到类似的东西。我编辑了问题以提供更多详细信息
  • 啊,python 还没有等效的功能。但我理解您希望您的管道继续移动,即使元素失败。
【解决方案2】:

Beam - 用于流式传输的 Python SDK 非常有限。

https://beam.apache.org/documentation/sdks/python-streaming/

从 Beam SDK 版本 2.5.0 开始,Python 流式管道执行实验性可用(有一些限制)。

Python 流式执行目前不支持以下功能。

一般 Beam 功能: 这些不受支持的 Beam 功能适用于所有跑步者。

  • 状态和计时器 API
  • 自定义源 API
  • 可拆分 DoFn API
  • 延迟数据的处理
  • 用户自定义的自定义WindowFn

DataflowRunner 特定功能: 此外,DataflowRunner 目前不支持 Python 流式执行的以下 Cloud Dataflow 特定功能。

  • 流式自动缩放
  • 更新现有管道
  • 云数据流模板
  • 一些监视功能,例如毫秒计数器、显示数据、度量和转换的元素计数。但是,支持源的日志记录、水印和元素计数。

更多信息在这里:https://beam.apache.org/documentation/sdks/python-streaming/#unsupported-features

还可以查看 DataFlow 文档中的以下发行说明:

【讨论】:

  • 感谢您,但是,我看不到与我的问题相关的内容。此外,还有一些让我感到困惑的事情:它说“Python 流式管道执行在实验上可用 [...] 从 Beam SDK 版本 2.5.0 开始。” (目前最新版本为 2.4.0)符合 Dataflow SDK for python 的发行说明。但是,像我上面发布的管道在流模式下为我工作,但有一些(出于我的目的是次要的)限制。我错过了什么吗?
  • 可以使用 Dataflow python SDK 进行流式传输,但某些功能可能会更改,并且某些功能尚无法使用。因此,一些简单的流式传输示例可能会起作用,因为该功能可能已经开发。我想关键是他们将拥有 2.5.0 发布的大部分功能。
【解决方案3】:

可能对您有所帮助(使用直接运行器时)是从插入中获取 ['FailedRows'] 到

 final_to_bq = (data
                   | 'Write to BQ' >> beam.io.WriteToBigQuery( ... )
)

然后:

print_failed_rows = (final_to_bq['FailedRows']
                         | 'print failed' >> beam.ParDo(Printer())
                         )

这对使用 DirectRunner 很有帮助...但还不能使用 DatflowRunner...

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-10
    • 1970-01-01
    • 1970-01-01
    • 2019-05-01
    相关资源
    最近更新 更多