【问题标题】:Dataflow / BigQuery FILE_LOADS: Job did not reach to a terminal state after waiting indefinitely数据流/BigQuery FILE_LOADS:无限期等待后作业未达到终端状态
【发布时间】:2021-01-22 17:38:26
【问题描述】:

我有以下管道

 with beam.Pipeline(options=pipeline_options) as pipeline:

        (
                p
                | "Read Pub/Sub Messages" >> beam.io.ReadFromPubSub(subscription=pubsub_subscription).with_output_types(bytes)
                | 'Fetch from API 1' >> beam.Map(fetch_1)
                | 'Filter out invalid data' >> beam.Filter(lambda item: item is not None)
                | 'Fetch from API 2' >> beam.Map(fetch_1)
                | 'Filter out invalid data' >> beam.Filter(lambda item: item is not None)
                | 'Parse Article to BQ json' >> beam.Map(parse_to_bq_json)
                | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(table='BQ_TABLE_NAME',
                                                               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                               method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
                                                               triggering_frequency=5
                                                               )
        )

当我使用 DirectRunner 运行它时按预期运行但以

结束
 Job did not reach to a terminal state after waiting indefinitely.

不多也不少。关于类似案例的文档或其他提及非常有限,因此欢迎提供任何反馈。

最后几行的示例:

INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.574Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/GroupByKey/WriteStream into WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/PairWithVoidKey
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.603Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/GroupByKey/MergeBuckets into WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/GroupByKey/ReadStream
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.637Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/Values into WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/GroupByKey/MergeBuckets
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.672Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/StreamingPCollectionViewWriter into WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/Values
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.705Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/PassTables/PassTables into WriteToBigQuery/BigQueryBatchFileLoads/WaitForCopyJobs/WaitForCopyJobs
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.739Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/AddUselessValue into WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/PassTables/PassTables
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.772Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/DeduplicateTables/WriteStream into WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/AddUselessValue
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.822Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/DeduplicateTables/MergeBuckets into WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/DeduplicateTables/ReadStream
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.848Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/GetTableNames into WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/DeduplicateTables/MergeBuckets
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.880Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/Delete into WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/GetTableNames
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.915Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/ImpulseEmptyPC/FlatMap(<lambda at core.py:3024>) into WriteToBigQuery/BigQueryBatchFileLoads/ImpulseEmptyPC/Impulse
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.939Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/ImpulseEmptyPC/Map(decode) into WriteToBigQuery/BigQueryBatchFileLoads/ImpulseEmptyPC/FlatMap(<lambda at core.py:3024>)
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:51.008Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/Flatten/FlattenReplace/WriteStream into WriteToBigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithTempTables/ParDo(TriggerLoadJobs)/ParDo(TriggerLoadJobs)
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:51.033Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/Flatten/FlattenReplace/WriteStream into WriteToBigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/TriggerLoadJobsWithoutTempTables
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:51.165Z: JOB_MESSAGE_ERROR: Workflow failed.
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:51.205Z: JOB_MESSAGE_DETAILED: Cleaning up.
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:51.252Z: JOB_MESSAGE_BASIC: Worker pool stopped.
Traceback (most recent call last):
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/pydevd.py", line 1477, in _exec
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/Users/XXXX/dev/XXXX/app/app.py", line 151, in <module>
    run(args, pipeline_args)
  File "/Users/XXXX/dev/XXXX/app/app.py", line 108, in run
    p.run().wait_until_finish()
  File "/Users/XXXX/.virtualenvs/app/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1675, in wait_until_finish
    'Job did not reach to a terminal state after waiting indefinitely.')
AssertionError: Job did not reach to a terminal state after waiting indefinitely.

编辑 1:从控制台日志添加输出(不幸的是那里没有太多信息):

{
textPayload: "Workflow failed."
insertId: "1rtvonbcgg5"
resource: {
type: "dataflow_step"
labels: {
project_id: "437008213460"
job_name: "app-test"
step_id: ""
region: "europe-west1"
job_id: "2021-01-22_11_22_27-2214838125974198028"
}
}
timestamp: "2021-01-22T19:22:37.425862432Z"
severity: "ERROR"
labels: {
dataflow.googleapis.com/job_id: "2021-01-22_11_22_27-2214838125974198028"
dataflow.googleapis.com/job_name: "app-test"
dataflow.googleapis.com/log_type: "system"
dataflow.googleapis.com/region: "europe-west1"
}
logName: "projects/some-project-eu/logs/dataflow.googleapis.com%2Fjob-message"
receiveTimestamp: "2021-01-22T19:22:39.086520796Z"
}

编辑 2:添加简化版本:

def foo(stream_data):
    return str(datetime.now())

with beam.Pipeline(options=pipeline_options) as p:
    (
        p
        | "Read Pub/Sub Messages" >> beam.io.ReadFromPubSub(subscription=pubsub_subscription).with_output_types(bytes)
        | 'Do foo' >> beam.Map(foo)
        | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(table=bq_project + ':' + bq_dataset + '.' + TABLE_NAME,
                                                       schema={"fields": [{"name": "foo_ts", "type": "TIMESTAMP"}]},
                                                       create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                       write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                       method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
                                                       triggering_frequency=5,
                                                       )
    )

还有我的运行命令:

streaming_app.py 
  --input_subscription projects/awesome_project/subscriptions/sub-test 
  --runner DataflowRunner 
  --bq_project awesome_project 
  --bq_dataset awesome_dataset 
  --region europe-west1 
  --temp_location gs://awesome-nlp 
  --job_name hope-it-works-test 
  --setup_file ./setup.py 
  --max_num_workers 10

编辑 3:还添加其中一个失败作业的作业 ID:2021-01-24_06_31_49-168256842937211337

【问题讨论】:

  • 您能否检查 Cloud Console 以查看是否有任何其他日志会指向错误? (例如,配额错误)。
  • 不幸的是,控制台错误日志中没有任何内容(添加到我的帖子中)。配额似乎也是绿色的,没有任何问题
  • 奇怪的是作业居然失败了。流式作业不应因 BigQuery 接收器问题而失败(它将永远重复尝试),因此我猜您没有在流式传输模式下运行,或者 Dataflow 在开始您的作业时遇到了一些问题。如果不查看详细的作业日志很难说,所以我建议联系 Google Cloud 支持。
  • @miro 您使用的是哪个版本的 python 和 Apache Beam?
  • @NirleyGupta,python 3.8 和 beam=2.27.0

标签: google-bigquery google-cloud-dataflow


【解决方案1】:

您能否尝试将您的代码与通过example 提供的示例数据流运行程序代码进行比较。由于我看不到您的完整代码,但是如果您尝试将代码拟合到上面给出的示例中,它将在 Dataflow runner 上运行。

编辑 1:

请在下面找到一个工作示例:-


#------------Import Lib-----------------------#
import apache_beam as beam
from apache_beam import window
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import os, sys, time
import argparse
import logging
from apache_beam.options.pipeline_options import SetupOptions
from datetime import datetime

#------------Set up BQ parameters-----------------------#
# Replace with Project Id
project = 'xxxxxxxxxxx'
Pubsub_subscription='projects/xxxxxxxxxxx/subscriptions/Pubsubdemo_subscription'
#plitting Of Records----------------------#

class Transaction_ECOM(beam.DoFn):
    def process(self, element):
        logging.info(element)

        result = json.loads(element)
        data_bkt = result.get('_bkt','null')
        data_cd=result.get('_cd','null')
        data_indextime=result.get('_indextime','0')
        data_kv=result.get('_kv','null')
        data_raw=result['_raw']
        data_raw1=data_raw.replace("\n", "")
        data_serial=result.get('_serial','null')
        data_si = str(result.get('_si','null'))
        data_sourcetype =result.get('_sourcetype','null')
        data_subsecond = result.get('_subsecond','null')
        data_time=result.get('_time','null')
        data_host=result.get('host','null')
        data_index=result.get('index','null')
        data_linecount=result.get('linecount','null')
        data_source=result.get('source','null')
        data_sourcetype1=result.get('sourcetype','null')
        data_splunk_server=result.get('splunk_server','null')

        return [{"datetime_indextime": time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(int(data_indextime))), "_bkt": data_bkt, "_cd": data_cd,  "_indextime": data_indextime,  "_kv": data_kv,  "_raw": data_raw1,  "_serial": data_serial,  "_si": data_si, "_sourcetype": data_sourcetype, "_subsecond": data_subsecond, "_time": data_time, "host": data_host, "index": data_index, "linecount": data_linecount, "source": data_source, "sourcetype": data_sourcetype1, "splunk_server": data_splunk_server}]



def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()

    known_args, pipeline_args = parser.parse_known_args(argv)


    pipeline_options = PipelineOptions(pipeline_args, streaming=True)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    p1 = beam.Pipeline(options=pipeline_options)



    data_loading = (
        p1
        | "Read Pub/Sub Messages" >> beam.io.ReadFromPubSub(subscription=Pubsub_subscription)

    )


    project_id = "xxxxxxxxxxx"
    dataset_id = 'test123'
    table_schema_ECOM = ('datetime_indextime:DATETIME, _bkt:STRING, _cd:STRING, _indextime:STRING, _kv:STRING, _raw:STRING, _serial:STRING, _si:STRING, _sourcetype:STRING, _subsecond:STRING, _time:STRING, host:STRING, index:STRING, linecount:STRING, source:STRING, sourcetype:STRING, splunk_server:STRING')

        # Persist to BigQuery
        # WriteToBigQuery accepts the data as list of JSON objects

#---------------------Index = ITF----------------------------------------------------------------------------------------------------------------------
    result = (
    data_loading
        | 'Clean-ITF' >> beam.ParDo(Transaction_ECOM())
        | 'Write-ITF' >> beam.io.WriteToBigQuery(
                                                    table='CFF_ABC',
                                                    dataset=dataset_id,
                                                    project=project_id,
                                                    schema=table_schema_ECOM,
                                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                                                    ))

    result = p1.run()
    result.wait_until_finish()


if __name__ == '__main__':
  path_service_account = '/home/vibhg/Splunk/CFF/xxxxxxxxxxx-abcder125.json'
  os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_service_account
  run()


它有几个额外的库,所以忽略它。

一些关键特性是:-

  1. 将“流式传输”设置为“真”
  2. 订阅名称的格式应为 ''projects/>xxxxxxxxxxx>/subscriptions/>订阅名称>''

在主题上发布的样本数据,将从订阅中获取如下:-

{"_bkt": "A1E8-A5370FECA146", "_cd": "412:140787687", "_indextime": "1611584940", "_kv": "1", "_raw": "2021-01-25 14:28:59,126 INFO  [com.abcd.mfs.builder.builders.BsLogEntryBuilder] [-] LogEntryType=\"BsCall\", fulName=\"EBCMFSSALES02\", BusinessServiceName=\"BsSalesOrderCreated\", Locality=\"NA\", Success=\"True\", BsExecutionTime=\"00:00:00.005\", OrderNo=\"374941817\", Locality=\"NA\" , [fulName=\"EBCMFSSALES02\"], [bsName=\"BsSalesOrderCreated\"], [userId=\"s-oitp-u-global\"], [userIdRegion=\"NA\"], [msgId=\"aaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbcccc\"], [msgIdSeq=\"2\"], [originator=\"ISOM\"] ", "_serial": "0", "_si": ["9ttr-bfc-gcp-europe-besti1", "itf"], "_sourcetype": "BBClog", "_subsecond": ".126", "_time": "2021-01-25 14:28:59.126 UTC", "host": "shampo-lx4821.abcd.com", "index": "itf", "linecount": "1", "source": "/opt/VRE/WebSphere/lickserv/profiles/appsrv01/logs/na-ebtree02_srv/log4j2.log", "sourcetype": "BBClog", "web_server": "9ttr-bfc-gcp-europe-besti1"}
[vibhg@aiclassificationdev8 jobrun]$ head -2 ITF_202101251435
{"_bkt": "itf~412~2EE5428B-7CEA-4C49-A1E8-A5370FECA146", "_cd": "412:140787687", "_indextime": "1611584940", "_kv": "1", "_raw": "2021-01-25 14:28:59,126 INFO  [com.abcd.mfs.builder.builders.BsLogEntryBuilder] [-] LogEntryType=\"BsCall\", fulName=\"EBCMFSSALES02\", BusinessServiceName=\"BsSalesOrderCreated\", Locality=\"NA\", Success=\"True\", BsExecutionTime=\"00:00:00.005\", OrderNo=\"374941817\", Locality=\"NA\" , [fulName=\"EBCMFSSALES02\"], [bsName=\"BsSalesOrderCreated\"], [userId=\"s-oitp-u-global\"], [userIdRegion=\"NA\"], [msgId=\"aaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbcccc\"], [msgIdSeq=\"2\"], [originator=\"ISOM\"] ", "_serial": "0", "_si": ["9ttr-bfc-gcp-europe-besti1", "itf"], "_sourcetype": "BBClog", "_subsecond": ".126", "_time": "2021-01-25 14:28:59.126 UTC", "host": "shampo-lx4821.abcd.com", "index": "itf", "linecount": "1", "source": "/opt/VRE/WebSphere/lickserv/profiles/appsrv01/logs/na-ebtree02_srv/log4j2.log", "sourcetype": "BBClog", "web_server": "9ttr-bfc-gcp-europe-besti1"}
{"_bkt": "9-A1E8-A5370FECA146", "_cd": "412:140787671", "_indextime": "1611584940", "_kv": "1", "_raw": "2021-01-25 14:28:58,659 INFO  [com.abcd.mfs.builder.builders.BsLogEntryBuilder] [-] LogEntryType=\"BsCall\", fulName=\"EBCMFSSALES02\", BusinessServiceName=\"BsCreateOrderV2\", BsExecutionTime=\"00:00:01.568\", OrderNo=\"374942155\", CountryCode=\"US\", ClientSystem=\"owfe-webapp\" , [fulName=\"EBCMFSSALES02\"], [bsName=\"BsCreateOrderV2\"], [userId=\"s-salja1-u-irssemal\"], [userIdRegion=\"NA\"], [msgId=\"6652311fece28966\"], [msgIdSeq=\"25\"], [originator=\"SellingApi\"] ", "_serial": "1", "_si": ["9ttr-bfc-gcp-europe-besti1", "itf"], "_sourcetype": "BBClog", "_subsecond": ".659", "_time": "2021-01-25 14:28:58.659 UTC", "host": "shampo-lx4821.abcd.com", "index": "itf", "linecount": "1", "source": "/opt/VRE/WebSphere/lickserv/profiles/appsrv01/logs/na-ebtree02_srv/log4j2.log", "sourcetype": "BBClog", "web_server": "9ttr-bfc-gcp-europe-besti1"}

您可以使用以下命令执行脚本:-

python script.py --region europe-west1 --project xxxxxxx --temp_location gs://temp/temp --runner DataflowRunner --job_name name

因为您似乎错过了在代码中设置 Streaming 参数。

【讨论】:

  • 代码作为 DirectRunner 可以正常工作,但在我使用 DataFlowRunner 推送它后以错误结束。如果我将最后一个管道步骤更改为 STREAMING_INSERTS,则一切正常,因为 Direct 或 Dataflow 运行。在出现错误之前,我只能看到一堆“Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads”日志(没有错误)。有一件事要提的是,我要写的表是由一个字段的时间戳分区
  • 如果我理解正确,通过将 STREAMING_INSERTS 添加到管道中,它可以作为 Direct 和 Dataflow 运行器完美运行吗?你的问题解决了吗?
  • 不,问题是在 FILE_LOADS 模式下运行 WriteToBigQuery 时我无法弄清楚上述问题。我在 FILE_LOADS 之后的原因是 STREAMING_INSERTS 不允许插入 1825-366 日期范围之外的分区表(我正在尝试处理)。
  • 将简化代码添加为 Edit 2(感谢您的帮助)
  • 不幸的是,我今天没有时间处理代码。我将在明天之前与您共享代码。
猜你喜欢
  • 1970-01-01
  • 2021-01-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-10-25
  • 2015-10-23
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多