【发布时间】:2019-12-31 18:56:19
【问题描述】:
我想使用 Python 和 Dataflow 在 BigQuery 中读取一个表。我事先不知道表的名称。我正在使用模板来传递表名,如下所示:
.
.
.
from apache_beam.options.pipeline_options import PipelineOptions
class DataflowOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--table_name',
help='Name of table on BigQuery')
def run(argv=None):
pipeline_options = PipelineOptions()
dataflow_options = pipeline_options.view_as(DataflowOptions)
with beam.Pipeline(options=pipeline_options) as pipeline:
table_spec = bigquery.TableReference(
projectId='MyProyectId',
datasetId='MyDataset',
tableId=str(dataflow_options.table_name))
p = (pipeline | 'Read Table' >> beam.io.Read(beam.io.BigQuerySource(table_spec)))
if __name__ == '__main__':
run()
但是当我启动作业时,我收到以下错误:
Workflow failed. Causes: S01:Read Table+Batch Users/ParDo(_GlobalWindowsBatchingDoFn)+Hash Users+Upload to Ads failed., BigQuery getting table "RuntimeValueProvider(option: table_name, type: str, default_value: None)" from dataset "MyDataset" in project "MyProject" failed., BigQuery execution failed., Error:
Message: Invalid table ID "RuntimeValueProvider(option: table_name, type: str, default_value: None)".
HTTP Code: 400
我阅读了this answer,但到目前为止还没有 2017 年的东西吗?
【问题讨论】:
-
如果我理解正确,您想从 BigQuery 中读取表,或者批量插入到不同的 BigQuery 表中,如 here?
-
您好,您还有问题吗?
标签: python google-cloud-dataflow apache-beam