【问题标题】:How to read BigQuery table using python pipeline code in GCP Dataflow如何在 GCP Dataflow 中使用 python 管道代码读取 BigQuery 表
【发布时间】:2018-01-22 16:28:08
【问题描述】:

有人可以分享语法以在用 python 为 GCP 数据流编写的管道中读取/写入 bigquery 表

【问题讨论】:

    标签: python google-cloud-dataflow gcp


    【解决方案1】:

    在数据流上运行

    首先,构建一个Pipeline 并使用以下选项使其在 GCP DataFlow 上运行:

    import apache_beam as beam
    
    options = {'project': <project>,
               'runner': 'DataflowRunner',
               'region': <region>,
               'setup_file': <setup.py file>}
    pipeline_options = beam.pipeline.PipelineOptions(flags=[], **options)
    pipeline = beam.Pipeline(options = pipeline_options)
    

    从 BigQuery 读取

    用您的查询定义BigQuerySource 并使用beam.io.Read 从BQ 读取数据:

    BQ_source = beam.io.BigQuerySource(query = <query>)
    BQ_data = pipeline | beam.io.Read(BQ_source)
    

    写入 BigQuery

    有两个选项可以写入 bigquery:

    • 使用BigQuerySinkbeam.io.Write

      BQ_sink = beam.io.BigQuerySink(<table>, dataset=<dataset>, project=<project>)
      BQ_data | beam.io.Write(BQ_sink)
      
    • 使用beam.io.WriteToBigQuery:

      BQ_data | beam.io.WriteToBigQuery(<table>, dataset=<dataset>, project=<project>)
      

    【讨论】:

    • 感谢 Robbe 的回复。但我需要使用 Google auth Json 文件连接到我创建的 Bigquery 表。请您指出需要在上述代码中进行的修改实现这一目标。
    • 您是指服务帐户吗? When running a pipeline on dataflow, it is automatically connected to the dataflow service account。您可以在 Cloud Console 中更改此服务帐号的权限。
    • 我刚开始数据流,你能帮帮我吗:我在 Cloudshell 上用下面的代码执行了 Python 文件
    • 出现错误:找不到记录器“oauth2client.contrib.multistore_file”的处理程序 Traceback(最近一次调用最后):文件“StackOverflow_B.py”,第 12 行,在 BQ_sink = beam .io.BigQuerySink(CustomerDetails_copy, dataset=StarPoc, project=star-poc) NameError: name 'CustomerDetails_copy' is not defined
    • 如果可能的话,请您分享一个小的完整工作代码,其中包含一些示例值,用于填写“查询”、“表名”值
    【解决方案2】:

    从 Bigquery 读取

    rows = (p | 'ReadFromBQ' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY, use_standard_sql=True))
    

    写入 Bigquery

    rows | 'writeToBQ' >> beam.io.Write(
    beam.io.BigQuerySink('{}:{}.{}'.format(PROJECT, BQ_DATASET_ID, BQ_TEST), schema='CONVERSATION:STRING, LEAD_ID:INTEGER', create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-03-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多