【问题标题】:Naming BigQuery Table From Template Runtime Parameters, Python, Apache Beam, Dataflow从模板运行时参数、Python、Apache Beam、Dataflow 命名 BigQuery 表
【发布时间】:2019-02-15 08:12:21
【问题描述】:

我正在使用 Python Apache Beam Dataflow 开发一个项目,我需要从启动数据流模板时提供的运行时参数中命名 bigquery 表

到目前为止,我没有运气,它要么为我提供了运行时参数的定义,要么为我提供了一个空字符串。

所以我基本上需要它以某种方式工作:

class CustomPipelineOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--path',
            type=str,
            help='csv storage path')
        parser.add_value_provider_argument(
            '--table_name',
            type=str,
            help='Table Id')
def run()
    def rewrite_values(element):
        """ Rewrite default env values"""
        try:
            logging.info("File Path with str(): {}".format(str(custom_options.path)))
            logging.info("----------------------------")
            logging.info("element: {}".format(element))
            project_id = str(cloud_options.project)
            file_path = custom_options.path.get()
            table_name = custom_options.table_name.get()

            logging.info("project: {}".format(project_id))
            logging.info("File path: {}".format(file_path))
            logging.info("language: {}".format(table_name))
            logging.info("----------------------------")
        except Exception as e:
            logging.info("Error format----------------------------")
            raise KeyError(e)

        return file_path

    pipeline_options = PipelineOptions()
    cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    custom_options = pipeline_options.view_as(CustomPipelineOptions)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    # Beginning of the pipeline
    p = beam.Pipeline(options=pipeline_options)

    init_data = (p
                 | beam.Create(["Start"])
                 | beam.FlatMap(rewrite_values))

pipeline processing, running pipeline etc.

save_data_bigquery = (table_data | "Get all numbers" >> beam.ParDo(GetAllNumbers())
                      | 'Flat items' >> beam.FlatMap(flat_item)
                      | 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=project_id,
                                                                       dataset="defined_dataset",
                                                                       table=table_name, **********
                                                                       schema="id:STRING",
                                                                       batch_size=10000)
                      )

在 writetobigquery 函数中命名表是我遇到问题的地方,我也尝试过使用 custom_options.table_name,将变量声明为全局等。

我创建了一个自定义 DoFn 来写入 BigQuery,尽管这是我的首选方法。

【问题讨论】:

  • 您可以在 WriteToBigQuery 转换上执行 table=custom_options.table_name.get() 吗?一旦你提交了管道,这应该足以为其提供价值..
  • 不,我不能,它会引发一个错误,告诉我运行时值提供程序未在运行时上下文中使用。
  • 看这个例子:cloud.google.com/dataflow/docs/templates/…,它似乎没有在提供值的参数上使用get()。会这样吗?
  • 这允许管道启动,并启动模板,尽管管道失败,因为表的名称本质上是:“{Runtime Parameter: type:str etc}”
  • 如您所见here@Pablo,bigquery 转换不是从自定义源读取的,即它没有在 SDK 中完全实现......我基本上只是想看看是否有人找到了解决办法...

标签: python google-cloud-platform google-bigquery google-cloud-dataflow apache-beam


【解决方案1】:

我尝试编写一个 BQ_writer 类并在其中编写了实际的 WriteToBigQuery。

class BQ_writer(beam.DoFn):
    def __init__(self, schema, output):
        self.output = output
        self.schema = schema

    def process(self, element):
        schema_l = self.schema.get()
        output_table_l = self.output.get()
        logging.info('Writing to table and schema: {}  {}'.format(output_table_l,schema_l))
        beam.io.WriteToBigQuery(output_table_l,
                                schema=schema_l,
                                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)

然后在管道中:

| 'WriteToBigQuery' >> beam.ParDo(BQ_writer(useroptions.schema,useroptions.output))

这工作正常,流程构建没有错误。但是在大查询表中找不到数据。可能是我们不能在 ParDo 函数中使用 WriteToBigQuery。欢迎提出建议如何从这里开始..

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2022-08-12
    • 2020-03-31
    • 2020-07-22
    • 2020-06-22
    • 1970-01-01
    • 2018-10-17
    • 1970-01-01
    • 2018-12-18
    相关资源
    最近更新 更多