【发布时间】:2019-02-15 08:12:21
【问题描述】:
我正在使用 Python Apache Beam Dataflow 开发一个项目,我需要从启动数据流模板时提供的运行时参数中命名 bigquery 表。
到目前为止,我没有运气,它要么为我提供了运行时参数的定义,要么为我提供了一个空字符串。
所以我基本上需要它以某种方式工作:
class CustomPipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--path',
type=str,
help='csv storage path')
parser.add_value_provider_argument(
'--table_name',
type=str,
help='Table Id')
def run()
def rewrite_values(element):
""" Rewrite default env values"""
try:
logging.info("File Path with str(): {}".format(str(custom_options.path)))
logging.info("----------------------------")
logging.info("element: {}".format(element))
project_id = str(cloud_options.project)
file_path = custom_options.path.get()
table_name = custom_options.table_name.get()
logging.info("project: {}".format(project_id))
logging.info("File path: {}".format(file_path))
logging.info("language: {}".format(table_name))
logging.info("----------------------------")
except Exception as e:
logging.info("Error format----------------------------")
raise KeyError(e)
return file_path
pipeline_options = PipelineOptions()
cloud_options = pipeline_options.view_as(GoogleCloudOptions)
custom_options = pipeline_options.view_as(CustomPipelineOptions)
pipeline_options.view_as(SetupOptions).save_main_session = True
# Beginning of the pipeline
p = beam.Pipeline(options=pipeline_options)
init_data = (p
| beam.Create(["Start"])
| beam.FlatMap(rewrite_values))
pipeline processing, running pipeline etc.
save_data_bigquery = (table_data | "Get all numbers" >> beam.ParDo(GetAllNumbers())
| 'Flat items' >> beam.FlatMap(flat_item)
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=project_id,
dataset="defined_dataset",
table=table_name, **********
schema="id:STRING",
batch_size=10000)
)
在 writetobigquery 函数中命名表是我遇到问题的地方,我也尝试过使用 custom_options.table_name,将变量声明为全局等。
我创建了一个自定义 DoFn 来写入 BigQuery,尽管这是我的首选方法。
【问题讨论】:
-
您可以在 WriteToBigQuery 转换上执行
table=custom_options.table_name.get()吗?一旦你提交了管道,这应该足以为其提供价值.. -
不,我不能,它会引发一个错误,告诉我运行时值提供程序未在运行时上下文中使用。
-
看这个例子:cloud.google.com/dataflow/docs/templates/…,它似乎没有在提供值的参数上使用
get()。会这样吗? -
这允许管道启动,并启动模板,尽管管道失败,因为表的名称本质上是:“{Runtime Parameter: type:str etc}”
-
如您所见here@Pablo,bigquery 转换不是从自定义源读取的,即它没有在 SDK 中完全实现......我基本上只是想看看是否有人找到了解决办法...
标签: python google-cloud-platform google-bigquery google-cloud-dataflow apache-beam