【发布时间】:2021-01-22 17:38:26
【问题描述】:
我有以下管道
with beam.Pipeline(options=pipeline_options) as pipeline:
(
p
| "Read Pub/Sub Messages" >> beam.io.ReadFromPubSub(subscription=pubsub_subscription).with_output_types(bytes)
| 'Fetch from API 1' >> beam.Map(fetch_1)
| 'Filter out invalid data' >> beam.Filter(lambda item: item is not None)
| 'Fetch from API 2' >> beam.Map(fetch_1)
| 'Filter out invalid data' >> beam.Filter(lambda item: item is not None)
| 'Parse Article to BQ json' >> beam.Map(parse_to_bq_json)
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(table='BQ_TABLE_NAME',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
triggering_frequency=5
)
)
当我使用 DirectRunner 运行它时按预期运行但以
结束 Job did not reach to a terminal state after waiting indefinitely.
不多也不少。关于类似案例的文档或其他提及非常有限,因此欢迎提供任何反馈。
最后几行的示例:
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.574Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/GroupByKey/WriteStream into WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/PairWithVoidKey
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.603Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/GroupByKey/MergeBuckets into WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/GroupByKey/ReadStream
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.637Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/Values into WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/GroupByKey/MergeBuckets
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.672Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/StreamingPCollectionViewWriter into WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/Values
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.705Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/PassTables/PassTables into WriteToBigQuery/BigQueryBatchFileLoads/WaitForCopyJobs/WaitForCopyJobs
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.739Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/AddUselessValue into WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/PassTables/PassTables
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.772Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/DeduplicateTables/WriteStream into WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/AddUselessValue
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.822Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/DeduplicateTables/MergeBuckets into WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/DeduplicateTables/ReadStream
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.848Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/GetTableNames into WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/DeduplicateTables/MergeBuckets
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.880Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/Delete into WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/GetTableNames
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.915Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/ImpulseEmptyPC/FlatMap(<lambda at core.py:3024>) into WriteToBigQuery/BigQueryBatchFileLoads/ImpulseEmptyPC/Impulse
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.939Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/ImpulseEmptyPC/Map(decode) into WriteToBigQuery/BigQueryBatchFileLoads/ImpulseEmptyPC/FlatMap(<lambda at core.py:3024>)
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:51.008Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/Flatten/FlattenReplace/WriteStream into WriteToBigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithTempTables/ParDo(TriggerLoadJobs)/ParDo(TriggerLoadJobs)
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:51.033Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/Flatten/FlattenReplace/WriteStream into WriteToBigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/TriggerLoadJobsWithoutTempTables
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:51.165Z: JOB_MESSAGE_ERROR: Workflow failed.
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:51.205Z: JOB_MESSAGE_DETAILED: Cleaning up.
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:51.252Z: JOB_MESSAGE_BASIC: Worker pool stopped.
Traceback (most recent call last):
File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/pydevd.py", line 1477, in _exec
pydev_imports.execfile(file, globals, locals) # execute the script
File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "/Users/XXXX/dev/XXXX/app/app.py", line 151, in <module>
run(args, pipeline_args)
File "/Users/XXXX/dev/XXXX/app/app.py", line 108, in run
p.run().wait_until_finish()
File "/Users/XXXX/.virtualenvs/app/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1675, in wait_until_finish
'Job did not reach to a terminal state after waiting indefinitely.')
AssertionError: Job did not reach to a terminal state after waiting indefinitely.
编辑 1:从控制台日志添加输出(不幸的是那里没有太多信息):
{
textPayload: "Workflow failed."
insertId: "1rtvonbcgg5"
resource: {
type: "dataflow_step"
labels: {
project_id: "437008213460"
job_name: "app-test"
step_id: ""
region: "europe-west1"
job_id: "2021-01-22_11_22_27-2214838125974198028"
}
}
timestamp: "2021-01-22T19:22:37.425862432Z"
severity: "ERROR"
labels: {
dataflow.googleapis.com/job_id: "2021-01-22_11_22_27-2214838125974198028"
dataflow.googleapis.com/job_name: "app-test"
dataflow.googleapis.com/log_type: "system"
dataflow.googleapis.com/region: "europe-west1"
}
logName: "projects/some-project-eu/logs/dataflow.googleapis.com%2Fjob-message"
receiveTimestamp: "2021-01-22T19:22:39.086520796Z"
}
编辑 2:添加简化版本:
def foo(stream_data):
return str(datetime.now())
with beam.Pipeline(options=pipeline_options) as p:
(
p
| "Read Pub/Sub Messages" >> beam.io.ReadFromPubSub(subscription=pubsub_subscription).with_output_types(bytes)
| 'Do foo' >> beam.Map(foo)
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(table=bq_project + ':' + bq_dataset + '.' + TABLE_NAME,
schema={"fields": [{"name": "foo_ts", "type": "TIMESTAMP"}]},
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
triggering_frequency=5,
)
)
还有我的运行命令:
streaming_app.py
--input_subscription projects/awesome_project/subscriptions/sub-test
--runner DataflowRunner
--bq_project awesome_project
--bq_dataset awesome_dataset
--region europe-west1
--temp_location gs://awesome-nlp
--job_name hope-it-works-test
--setup_file ./setup.py
--max_num_workers 10
编辑 3:还添加其中一个失败作业的作业 ID:2021-01-24_06_31_49-168256842937211337
【问题讨论】:
-
您能否检查 Cloud Console 以查看是否有任何其他日志会指向错误? (例如,配额错误)。
-
不幸的是,控制台错误日志中没有任何内容(添加到我的帖子中)。配额似乎也是绿色的,没有任何问题
-
奇怪的是作业居然失败了。流式作业不应因 BigQuery 接收器问题而失败(它将永远重复尝试),因此我猜您没有在流式传输模式下运行,或者 Dataflow 在开始您的作业时遇到了一些问题。如果不查看详细的作业日志很难说,所以我建议联系 Google Cloud 支持。
-
@miro 您使用的是哪个版本的 python 和 Apache Beam?
-
@NirleyGupta,python 3.8 和 beam=2.27.0
标签: google-bigquery google-cloud-dataflow