【问题标题】:Unable to pass BigQuery table name as ValueProvider to dataflow template无法将 BigQuery 表名作为 ValueProvider 传递给数据流模板
【发布时间】:2020-12-17 14:11:56
【问题描述】:

我想将 BigQuery 表名作为运行时参数传递给我的数据流模板,如下所示:

class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--input', type=str, help='BigQuery table reference DATASET.TABLE')
        parser.add_value_provider_argument('--output', type=str, help='BigQuery table reference DATASET.TABLE')

def run(argv=None):    
    pipeline_options = PipelineOptions()
    p = beam.Pipeline(options=pipeline_options)
    user_options = pipeline_options.view_as(UserOptions)

    # Query
    query = f"""
    SELECT * FROM `{user_options.input}`
    WHERE last_scrape_date > (SELECT max(last_scrape_date) from `{user_options.output}`)
    """
    (p
     | 'Read from BQ Table' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
     | 'Write to BigQuery' >> beam.io.Write(
         beam.io.WriteToBigQuery(
             '{user_options.output}',
             schema=schema,
             # Creates the table in BigQuery if it does not yet exist.
             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
             # Deletes all data in the BigQuery table before writing.
             write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
    p.run().wait_until_finish()

根据这些answer1answer2 的建议,我将--experiment=use_beam_bq_sink 标志添加到我的命令行。模板已成功渲染,但在启动模板时出现此错误

apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/vf-scrapers/jobs?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Tue, 15 Dec 2020 23:25:17 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '500', '-content-encoding': 'gzip'}>, content <{
  "error": {
    "code": 400,
    "message": "Table name \"RuntimeValueProvider(option: input, type: str, default_value: None)\" missing dataset while no default dataset is set in the request.",
    "errors": [
      {
        "message": "Table name \"RuntimeValueProvider(option: input, type: str, default_value: None)\" missing dataset while no default dataset is set in the request.",
        "domain": "global",
        "reason": "invalid"
      }
    ],
    "status": "INVALID_ARGUMENT"
  }
}

我尝试将.get() 方法添加到f'{user_options.input}' 并收到相同的错误。

编辑:

这个问题有一张票:https://issues.apache.org/jira/browse/BEAM-1440,但到目前为止我还不明白结论。

【问题讨论】:

    标签: python google-cloud-dataflow apache-beam


    【解决方案1】:

    运行时参数(如 Bigquery 表名称)在管道构建期间表示为 ValueProvider 对象,而不是字符串文字。您可以在输出中看到user_options.input 输出为字符串化RuntimeValueProvider。但在这种情况下,修复非常简单;直接将对象传入而不将其转换为字符串,如下所示:

    beam.io.WriteToBigQuery(
                 user_options.output,
                 schema=schema,
                 ...
    

    这是因为在 Dataflow 模板中,管道构建发生在传递运行时参数之前。调用此代码时,您的输入和输出参数尚未定义。取而代之的是,ValueProvider 充当占位符,允许在运行时检索参数(一旦定义)。

    【讨论】:

    猜你喜欢
    • 2016-04-22
    • 1970-01-01
    • 2015-01-20
    • 2014-08-22
    • 2011-09-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多