【发布时间】:2020-12-17 14:11:56
【问题描述】:
我想将 BigQuery 表名作为运行时参数传递给我的数据流模板,如下所示:
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--input', type=str, help='BigQuery table reference DATASET.TABLE')
parser.add_value_provider_argument('--output', type=str, help='BigQuery table reference DATASET.TABLE')
def run(argv=None):
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
user_options = pipeline_options.view_as(UserOptions)
# Query
query = f"""
SELECT * FROM `{user_options.input}`
WHERE last_scrape_date > (SELECT max(last_scrape_date) from `{user_options.output}`)
"""
(p
| 'Read from BQ Table' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.WriteToBigQuery(
'{user_options.output}',
schema=schema,
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# Deletes all data in the BigQuery table before writing.
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()
根据这些answer1 和answer2 的建议,我将--experiment=use_beam_bq_sink 标志添加到我的命令行。模板已成功渲染,但在启动模板时出现此错误
apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/vf-scrapers/jobs?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Tue, 15 Dec 2020 23:25:17 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '500', '-content-encoding': 'gzip'}>, content <{
"error": {
"code": 400,
"message": "Table name \"RuntimeValueProvider(option: input, type: str, default_value: None)\" missing dataset while no default dataset is set in the request.",
"errors": [
{
"message": "Table name \"RuntimeValueProvider(option: input, type: str, default_value: None)\" missing dataset while no default dataset is set in the request.",
"domain": "global",
"reason": "invalid"
}
],
"status": "INVALID_ARGUMENT"
}
}
我尝试将.get() 方法添加到f'{user_options.input}' 并收到相同的错误。
编辑:
这个问题有一张票:https://issues.apache.org/jira/browse/BEAM-1440,但到目前为止我还不明白结论。
【问题讨论】:
标签: python google-cloud-dataflow apache-beam