【问题标题】:Error when creating Google Dataflow template file创建 Google Dataflow 模板文件时出错
【发布时间】:2021-02-16 16:23:38
【问题描述】:

我正在尝试使用模板安排在一定时间后结束的数据流。使用命令行时我能够成功执行此操作,但是当我尝试使用 Google Cloud Scheduler 执行此操作时,我在创建模板时遇到错误。

错误是

File "pipelin_stream.py", line 37, in <module>
    main()
  File "pipelin_stream.py", line 34, in main
    result.cancel()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1638, in cancel
    raise IOError('Failed to get the Dataflow job id.')
IOError: Failed to get the Dataflow job id.

我用来制作模板的命令是

python pipelin_stream.py \
--runner Dataflowrunner \
--project $PROJECT \
--temp_location $BUCKET/tmp \
--staging_location $BUCKET/staging \
--template_location $BUCKET/templates/time_template_test \
--streaming

我拥有的管道文件是这个

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys


PROJECT = 'projectID'
schema = 'ex1:DATE, ex2:STRING'
TOPIC = "projects/topic-name/topics/scraping-test"

def main(argv=None):

    parser = argparse.ArgumentParser()
    parser.add_argument("--input_topic")
    parser.add_argument("--output")
    known_args = parser.parse_known_args(argv)

    p = beam.Pipeline(options=PipelineOptions(region='us-central1', service_account_email='email'))

    (p
        | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
        | 'Decode' >> beam.Map(lambda x:x.decode('utf-8'))
        | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('tablename'.format(PROJECT), schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
    )
    result = p.run()
    result.wait_until_finish(duration=3000)
    result.cancel()   # If the pipeline has not finished, you can cancel it

if __name__ == '__main__':
    logger = logging.getLogger().setLevel(logging.INFO)
    main()

有人知道我为什么会收到这个错误吗?

【问题讨论】:

    标签: google-cloud-platform google-dataflow


    【解决方案1】:

    错误是由cancel function 在等待时间之后引发的,它似乎是无害的。

    为了证明这一点,我设法使用 python 3.5 从我的虚拟机中重现了您的确切问题。该模板由--template_location 在给定路径中创建,可用于运行作业。请注意,我需要对您的代码进行一些更改才能使其在 Dataflow 中实际工作。

    如果它对你有用,我最终使用了这个管道代码

    from apache_beam.options.pipeline_options import PipelineOptions
    from google.cloud import pubsub_v1
    from google.cloud import bigquery
    import apache_beam as beam
    import logging
    import argparse
    import datetime
    
    # Fill this values in order to have them by default
    # Note that the table in BQ needs to have the column names message_body and publish_time
    
    Table = 'projectid:datasetid.tableid'
    schema = 'ex1:STRING, ex2:TIMESTAMP'
    TOPIC = "projects/<projectid>/topics/<topicname>"
    
    class AddTimestamps(beam.DoFn):
        def process(self, element, publish_time=beam.DoFn.TimestampParam):
            """Processes each incoming element by extracting the Pub/Sub
            message and its publish timestamp into a dictionary. `publish_time`
            defaults to the publish timestamp returned by the Pub/Sub server. It
            is bound to each element by Beam at runtime.
            """
    
            yield {
                "message_body": element.decode("utf-8"),
                "publish_time": datetime.datetime.utcfromtimestamp(
                    float(publish_time)
                ).strftime("%Y-%m-%d %H:%M:%S.%f"),
            }
    
    
    def main(argv=None):
    
        parser = argparse.ArgumentParser()
        parser.add_argument("--input_topic", default=TOPIC)
        parser.add_argument("--output_table", default=Table)
        args, beam_args = parser.parse_known_args(argv)
        # save_main_session needs to be set to true due to modules being used among the code (mostly datetime)
        # Uncomment the service account email to specify a custom service account
        p = beam.Pipeline(argv=beam_args,options=PipelineOptions(save_main_session=True,
    region='us-central1'))#, service_account_email='email'))
    
        (p
            | 'ReadData' >> beam.io.ReadFromPubSub(topic=args.input_topic).with_output_types(bytes)
            | "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
            | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(args.output_table, schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
        )
        result = p.run()
        #Warning: Cancel does not work properly in a template
        result.wait_until_finish(duration=3000)
        result.cancel()   # Cancel the streaming pipeline after a while to avoid consuming more resources
    
    if __name__ == '__main__':
        logger = logging.getLogger().setLevel(logging.INFO)
        main()
    

    然后我运行命令:

    # Fill accordingly
    PROJECT="MYPROJECT-ID"
    BUCKET="MYBUCKET"
    TEMPLATE_NAME="TRIAL"
    
    # create the template
    python3 -m templates.template-pubsub-bigquery \
      --runner DataflowRunner \
      --project $PROJECT \
      --staging_location gs://$BUCKET/staging \
      --temp_location gs://$BUCKET/temp \
      --template_location gs://$BUCKET/templates/$TEMPLATE_NAME \
      --streaming
    

    创建管道(这会产生您提到的错误但仍会创建模板)。 和

    # Fill job-name and gcs location accordingly
    # Uncomment and fill the parameters should you want to use your own
    
    gcloud dataflow jobs run <job-name> \
            --gcs-location "gs://<MYBUCKET>/dataflow/templates/mytemplate" 
       #     --parameters input_topic="", output_table=""
    

    运行管道。

    正如我所说,模板已正确创建并且管道正常工作。


    编辑

    确实取消功能在模板中无法正常工作。它似乎是一个问题,它需要模板创建时的作业 ID,它当然不存在,因此它省略了该功能。

    我发现 this other post 处理提取管道上的作业 ID。我尝试了一些调整以使其在模板代码本身中工作,但我认为没有必要。鉴于您想安排它们的执行,我会选择更简单的选项并在特定时间(例如 9:01 GMT)执行流式管道模板并使用脚本取消管道

    import logging, re,os
    from googleapiclient.discovery import build
    from oauth2client.client import GoogleCredentials
    
    def retrieve_job_id():
      #Fill as needed
      project = '<project-id>'
      job_prefix = "<job-name>"
      location = '<location>'
    
      logging.info("Looking for jobs with prefix {} in region {}...".format(job_prefix, location))
    
      try:
        credentials = GoogleCredentials.get_application_default()
        dataflow = build('dataflow', 'v1b3', credentials=credentials)
    
        result = dataflow.projects().locations().jobs().list(
          projectId=project,
          location=location,
        ).execute()
    
        job_id = "none"
    
        for job in result['jobs']:
          if re.findall(r'' + re.escape(job_prefix) + '', job['name']):
            job_id = job['id']
            break
    
        logging.info("Job ID: {}".format(job_id))
        return job_id
    
      except Exception as e:
        logging.info("Error retrieving Job ID")
        raise KeyError(e)
    
    
    os.system('gcloud dataflow jobs cancel {}'.format(retrieve_job_id()))
    

    在另一个时间(例如 9:05 GMT)。此脚本假定您每次都使用相同的作业名称运行脚本,并采用名称的最新外观并取消它。我试了几次,效果很好。

    【讨论】:

    • 您好,感谢您查看。我意识到错误是由取消功能引起的,但我希望能够在一段时间后停止数据流。你知道是否可以取消工作吗?
    • 我无法让取消功能工作。我认为他们构建模板的方式是行不通的,但也许有一种我们还没有考虑过的方式。无论如何,我添加了一个解决方法,可以让我取消管道。希望有用。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-04-09
    相关资源
    最近更新 更多