【问题标题】:Accessing Templated Runtime Parameters in Google Cloud Dataflow - Python在 Google Cloud Dataflow 中访问模板化运行时参数 - Python
【发布时间】:2018-12-18 13:43:02
【问题描述】:

我正在尝试为 Google Cloud Dataflow 创建自己的模板,以便可以从 GUI 执行作业,让其他人更容易执行。我按照教程,创建了自己的 PipelineOptions 类,并使用 parser.add_value_provider_argument() 方法填充它。然后,当我尝试使用 my_options.argname.get() 将这些参数传递到管道中时,我收到一个错误,告诉我该项目不是从运行时上下文中调用的。我不明白这一点。 args 不是定义管道图本身的一部分,它们只是输入文件名、输出表名等参数。

下面是代码。如果我对输入文件名、输出表名、写入处置和分隔符进行硬编码,它就可以工作。如果我用它们的 my_options.argname.get() 等效替换它们,它会失败。在所示的 sn-p 中,除了 outputBQTable 名称之外,我已经硬编码了所有内容,我在其中使用了 my_options.outputBQTable.get()。这失败了,并显示以下消息。

apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: outputBQTable, type: str, default_value: 'dataflow_csv_reader_testing.names').get() 未从运行时上下文调用

感谢任何有关如何使其发挥作用的指导。

import apache_beam
from apache_beam.io.gcp.gcsio import GcsIO
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import RuntimeValueProvider
import csv
import argparse

class MyOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls,parser):
        parser.add_value_provider_argument('--inputGCS', type=str,
              default='gs://mybucket/df-python-csv-test/test-dict.csv',
              help='Input gcs csv file, full path and filename')
        parser.add_value_provider_argument('--delimiter', type=str,
              default=',',
              help='Character used as delimiter in csv file, default is ,')
        parser.add_value_provider_argument('--outputBQTable', type=str,
              default='dataflow_csv_reader_testing.names',
              help='Output BQ Dataset.Table to write to')
        parser.add_value_provider_argument('--writeDisposition', type=str,
              default='WRITE_APPEND',
              help='BQ write disposition, WRITE_TRUNCATE or WRITE_APPEND or WRITE_EMPTY')

def main():
    optlist=PipelineOptions()
    my_options=optlist.view_as(MyOptions)
    p = apache_beam.Pipeline(options=optlist)
    (p
    | 'create'            >> apache_beam.Create(['gs://mybucket/df-python-csv-test/test-dict.csv'])
    | 'read gcs csv dict' >> apache_beam.FlatMap(lambda file: csv.DictReader(apache_beam.io.gcp.gcsio.GcsIO().open(file,'r'), delimiter='|'))
    | 'write bq record'   >> apache_beam.io.Write(apache_beam.io.BigQuerySink(my_options.outputBQTable.get(), write_disposition='WRITE_TRUNCATE'))
    )
    p.run()

if __name__ == '__main__':
    main()

【问题讨论】:

  • 我发现了一个与去年类似的问题,发布在 StackOverflow 上。它指出 Python 中的 ValueProviders(模板化参数)仅适用于常规文件 I/O,不适用于 BigQuery 之类的东西。这是(曾经)Python SDK 的限制。有谁知道这个状态? stackoverflow.com/questions/47134847/…

标签: python-2.7 google-cloud-platform google-bigquery google-cloud-dataflow apache-beam


【解决方案1】:

指定管道时不能使用my_options.outputBQTable.get()。 BigQuery 接收器已经知道如何使用运行时提供的参数,所以我认为您可以传递my_options.outputBQTable

根据我从文档中收集到的信息,您应该只在传递给ParDo 步骤的DoFns 的process 方法中使用options.runtime_argument.get()

注意:我使用 2.8.0 的 Apache Beam SDK 进行了测试,因此我使用了 WriteToBigQuery 而不是 BigQuerySink

【讨论】:

  • 如果您使用 my_options.outputBQTable 并部署管道模板,您将收到 RuntimeValue() 错误。据我所知,您不能将静态变量作为字符串传递到管道中,除非您将 DoFn 与 .get() 一起使用。如果可以,请告诉我。
  • @ethanenglish 你是对的。到目前为止,只有基于文件的 I/O 方法可以接受运行时参数。 cloud.google.com/dataflow/docs/guides/templates/…
【解决方案2】:

这是一个尚未为 Python SDK 开发的功能。

相关的open issue可以在Apache Beam项目页面找到。

在解决上述问题之前,目前的解决方法是使用 Java SDK。

【讨论】:

    猜你喜欢
    • 2019-02-15
    • 1970-01-01
    • 2017-09-21
    • 2016-06-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多