【发布时间】:2018-10-18 05:15:28
【问题描述】:
我有一个从 PubSub 主题读取事件数据的数据流管道。收到消息时,我会执行转换步骤,以使事件数据适合我所需的 BigQuery 架构。但是,如果我创建的输入不符合模式,我就会遇到问题。显然它会无限重试写入 BigQuery:
Count: 76 RuntimeError: Could not successfully insert rows to BigQuery table
目前我正在做很多手动检查输入是否符合架构,但是,如果我没有考虑,我会累积 RuntimeErrors。有没有办法尝试写入 BigQuery,以防无法使用原始输入执行其他操作?或者,有没有办法尝试多次写入,否则会在不添加新的 RuntimeErrors 的情况下静默失败?
编辑:我正在使用 python SDK。这是我的简化管道以进一步澄清:
with beam.Pipeline(options=options) as pipeline:
# Read messages from PubSub
event = (pipeline
| 'Read from PubSub' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(topic))
output = (event
| 'Create output' >> beam.transforms.core.FlatMap(lambda event: [{'input': event}]))
# Write to Big Query
_ = (output
| 'Write log to BigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
table=table,
dataset=dataset,
project=project,
schema=schema,
create_disposition=beam.io.gcp.bigquery.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.gcp.bigquery.BigQueryDisposition.WRITE_APPEND))
如果我的表中没有“输入”列,则作业将终止。在查看https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1279 之后,似乎这就是这种行为的原因。通过自定义https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1187 并且不引发 RuntimeError,我可以克服我的问题,但是,这感觉很麻烦。有人对更简单的方法有建议吗?
【问题讨论】:
-
您是否运行了模板数据流管道之一来执行此操作?是否有关于该错误的更多信息?
-
我在 python 中创建了一个自定义数据流管道。对于适合架构的有效负载,作业运行良好。自定义验证过滤问题列表,将其写入单独的调试表,这也可以正常工作。但是,在我没有考虑到的情况下,这份工作就会消失。我使用简化的管道编辑了问题以提供更多详细信息。
-
与您的情况相关的公共功能请求已打开。你可以在这里关注它 - issuetracker.google.com/issues/110334821
-
哦,太好了,我真的很感激!
标签: python google-bigquery google-cloud-dataflow google-cloud-pubsub