【发布时间】:2021-04-20 22:14:09
【问题描述】:
我已经阅读了这些帖子:
他们很有帮助,我最终为发布/订阅消息创建了类似的内容,例如:{"id": "1"}(仅用于测试):
subscription = f"projects/{project}/subscriptions/test-subscriber"
with beam.Pipeline(options=pipeline_options) as p:
records = p | 'Read' >> beam.io.ReadFromPubSub(subscription=subscription)
_ = records | 'Write' >> beam.io.parquetio.WriteToParquet(
'gs://<bucket>/parquet/output/new',
pa.schema([('id', pa.string())]),
file_name_suffix=".parquet"
)
我只能看到错误“工作流程失败”。但仅适用于 DataflowRunner,对于 DirectRunner 我没有问题。这里是“运行”命令:
python code/dataflow/pubsub_to_gcs.py \
--project=${PROJECT_NAME} \
--output_path=gs://"${BUCKET_NAME}"/dataflow_output \
--region=${REGION} \
--job_name=testdataflow \
--runner=DataflowRunner \
--staging_location gs://${BUCKET_NAME}/staging_location \
--temp_location=gs://${BUCKET_NAME}/temp \
以下是此作业的日志(第一行是最后一次出现的记录):
insertId,"labels.""dataflow.googleapis.com/job_id""","labels.""dataflow.googleapis.com/job_name""","labels.""dataflow.googleapis.com/log_type""","labels.""dataflow.googleapis.com/region""",logName,receiveTimestamp,resource.labels.job_id,resource.labels.job_name,resource.labels.project_id,resource.labels.region,resource.labels.step_id,resource.type,severity,textPayload,timestamp
kj98obbko,2021-01-13_06_39_40-5574094972724851911,testdataflow,system,us-west1,projects/<proj>/logs/dataflow.googleapis.com%2Fjob-message,2021-01-13T14:39:46.649539202Z,2021-01-13_06_39_40-5574094972724851911,testdataflow,504796790819,us-west1,,dataflow_step,INFO,Worker pool stopped.,2021-01-13T14:39:45.784300690Z
kj98obbkn,2021-01-13_06_39_40-5574094972724851911,testdataflow,system,us-west1,projects/<proj>/logs/dataflow.googleapis.com%2Fjob-message,2021-01-13T14:39:46.649539202Z,2021-01-13_06_39_40-5574094972724851911,testdataflow,504796790819,us-west1,,dataflow_step,DEBUG,Cleaning up.,2021-01-13T14:39:45.751440904Z
kj98obbkm,2021-01-13_06_39_40-5574094972724851911,testdataflow,system,us-west1,projects/<proj>/logs/dataflow.googleapis.com%2Fjob-message,2021-01-13T14:39:46.649539202Z,2021-01-13_06_39_40-5574094972724851911,testdataflow,504796790819,us-west1,,dataflow_step,ERROR,Workflow failed.,2021-01-13T14:39:45.733187585Z
kj98obbkl,2021-01-13_06_39_40-5574094972724851911,testdataflow,system,us-west1,projects/<proj>/logs/dataflow.googleapis.com%2Fjob-message,2021-01-13T14:39:46.649539202Z,2021-01-13_06_39_40-5574094972724851911,testdataflow,504796790819,us-west1,,dataflow_step,DEBUG,Fusing consumer WriteToParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/StreamingPCollectionViewWriter into WriteToParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/Values,2021-01-13T14:39:45.666536683Z
kj98obbkk,2021-01-13_06_39_40-5574094972724851911,testdataflow,system,us-west1,projects/<proj>/logs/dataflow.googleapis.com%2Fjob-message,2021-01-13T14:39:46.649539202Z,2021-01-13_06_39_40-5574094972724851911,testdataflow,504796790819,us-west1,,dataflow_step,DEBUG,Fusing consumer WriteToParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/Values into WriteToParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/GroupByKey/MergeBuckets,2021-01-13T14:39:45.651338237Z
kj98obbkj,2021-01-13_06_39_40-5574094972724851911,testdataflow,system,us-west1,projects/<proj>/logs/dataflow.googleapis.com%2Fjob-message,2021-01-13T14:39:46.649539202Z,2021-01-13_06_39_40-5574094972724851911,testdataflow,504796790819,us-west1,,dataflow_step,DEBUG,Fusing consumer WriteToParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/GroupByKey/MergeBuckets into WriteToParquet/Write/WriteImpl/FinalizeWrite/_DataflowIterableAsMultimapSideInput(MapToVoidKey2.out.0)/GroupByKey/ReadStream,2021-01-13T14:39:45.635839235Z
kj98obbki,2021-01-13_06_39_40-5574094972724851911,testdataflow,system,us-west1,projects/<proj>
目前,我放弃了使用数据流,但如果有人知道我应该在哪里检查,我将不胜感激。
【问题讨论】:
-
这些都是你工作的日志吗?在 Cloud Logs 查看器中,应该有一个下拉菜单,可让您按标签进行过滤。例如,在您的 sn-p 中,所有日志消息似乎都是
dataflow.googleapis.com/job-message日志,但该下拉列表允许您选择“所有日志”,这将显示更多日志消息。
标签: python-3.x google-cloud-platform apache-beam dataflow