【问题标题】:How to write BigQuery results to GCS in JSON format using Apache Beam with custom formatting?如何使用具有自定义格式的 Apache Beam 以 JSON 格式将 BigQuery 结果写入 GCS?
【发布时间】:2021-09-21 17:55:14
【问题描述】:

我正在尝试使用 Python 中的 Apache Beam 将 BigQuery 表记录作为 JSON 文件写入 GCS 存储桶中。

我有一个 BigQuery 表 - my_project.my_dataset.my_table 像这样

我希望将表记录/条目写入 GCS 存储桶位置的 JSON 文件 - “gs://my_core_bucket/data/my_data.json”

预期的 JSON 格式:


[
    {"id":"1","values":{"name":"abc","address":"Mumbai","phn":"1111111111"}},
    {"id":"2","values":{"name":"def","address":"Kolkata","phn":"2222222222"}},
    {"id":"3","values":{"name":"ghi","address":"Chennai","phn":"3333333333"}},
    {"id":"4","values":{"name":"jkl","address":"Delhi","phn":"4444444444"}}
]

但是,在我当前的 apache 管道实现中,我看到创建的 JSON 文件在文件“gs://my_core_bucket/data/my_data.json”中有类似这样的条目

{"id":"1","values":{"name":"abc","address":"Mumbai","phn":"1111111111"}}
{"id":"2","values":{"name":"def","address":"Kolkata","phn":"2222222222"}}
{"id":"3","values":{"name":"ghi","address":"Chennai","phn":"3333333333"}}
{"id":"4","values":{"name":"jkl","address":"Delhi","phn":"4444444444"}}

如何创建一个干净的 JSON 文件,将 BigQuery 记录作为 JSON 数组元素?

这是我的管道代码。

import os
import json
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class PrepareData(beam.DoFn):
    def process(self, record):  # sample record - {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}        
        rec_columns = [ "id", "name", "address", "phn", "country", "age"]   # all columns of the bigquery table 

        rec_keys = list(record.keys())  # ["id", "name", "address", "phn"]  # columns needed for processing  

        values = {}

        for x in range(len(rec_keys)):
            key = rec_keys[x]

            if key != "id" and key in rec_columns:
                values[key] = record[key]

        return [{"id": record['id'], "values": values}]


class MainClass:
    def run_pipe(self):
        try:        
            project = "my_project"
            dataset = "my_dataset"
            table = "my_table"
            region = "us-central1"
            job_name = "create-json-file"
            temp_location = "gs://my_core_bucket/dataflow/temp_location/"
            runner = "DataflowRunner"
            
            # set pipeline options
            argv = [
                f'--project={project}',
                f'--region={region}',
                f'--job_name={job_name}',
                f'--temp_location={temp_location}',
                f'--runner={runner}'
            ]
            
            # json file name
            file_name = "gs://my_core_bucket/data/my_data"

            # create pipeline 
            p = beam.Pipeline(argv=argv)

            # query to read table data
            query = f"SELECT id, name, address, phn FROM `{project}.{dataset}.{table}` LIMIT 4"

            bq_data = p | 'Read Table' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))

            # bq_data will be in the form 
            # {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
            # {"id": "2", "name": "def", "address": "Kolkata", "phn": "2222222222"}
            # {"id": "3", "name": "ghi", "address": "Chennai", "phn": "3333333333"}
            # {"id": "4", "name": "jkl", "address": "Delhi", "phn": "4444444444"}
            
            # alter data in the form needed for downstream process
            prepared_data = bq_data | beam.ParDo(PrepareData())

            # write formatted pcollection as JSON file
            prepared_data | 'JSON format' >> beam.Map(json.dumps)
            prepared_data | 'Write Output' >> beam.io.WriteToText(file_name, file_name_suffix=".json", shard_name_template='')

            # execute pipeline
            p.run().wait_until_finish()
        except Exception as e:
            logging.error(f"Exception in run_pipe - {str(e)}")


if __name__ == "__main__":
    main_cls = MainClass()
    main_cls.run_pipe()

【问题讨论】:

  • 输出非常有意义,因为 Apache Beam 会将输入 PCollection 的每个元素映射到 JSON,如所示。我不确定它是否会起作用,但请尝试CombineGloballyprepared_data | 'JSON format' >> beam. CombineGlobally(json.dumps)。请参阅relevant docs
  • @jccampanero - 它不适用于 beam.CombineGlobally(json.dumps) 。当我尝试使用 CombineGlobally 时,我看到错误“_ReiterableChain 类型的对象不是 JSON 可序列化的”
  • 拜托,你能试试提供的答案吗?我希望它有所帮助。请注意,我现在不认为以这种方式编写信息可能会影响性能。

标签: python-3.x google-cloud-platform google-bigquery apache-beam google-dataflow


【解决方案1】:

请按照 cmets 中的建议,尝试将所有结果合二为一。为了成功序列化作为组合过程获得的set,您可以使用自定义序列化器。

您的代码可能如下所示:

import os
import json
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


# Based on https://stackoverflow.com/questions/8230315/how-to-json-serialize-sets
class SetEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, set):
            return list(obj)
        return json.JSONEncoder.default(self, obj)


# utility function for list combination
class ListCombineFn(beam.CombineFn):
    def create_accumulator(self):
        return []

    def add_input(self, accumulator, input):
        accumulator.append(input)
        return accumulator

    def merge_accumulators(self, accumulators):
        merged = []
        for accum in accumulators:
            merged += accum
        return merged

    def extract_output(self, accumulator):
        return accumulator



class PrepareData(beam.DoFn):
    def process(self, record):  # sample record - {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}        
        rec_columns = [ "id", "name", "address", "phn", "country", "age"]   # all columns of the bigquery table 

        rec_keys = list(record.keys())  # ["id", "name", "address", "phn"]  # columns needed for processing  

        values = {}

        for x in range(len(rec_keys)):
            key = rec_keys[x]

            if key != "id" and key in rec_columns:
                values[key] = record[key]

        return [{"id": record['id'], "values": values}]


class MainClass:
    def run_pipe(self):
        try:        
            project = "my_project"
            dataset = "my_dataset"
            table = "my_table"
            region = "us-central1"
            job_name = "create-json-file"
            temp_location = "gs://my_core_bucket/dataflow/temp_location/"
            runner = "DataflowRunner"
            
            # set pipeline options
            argv = [
                f'--project={project}',
                f'--region={region}',
                f'--job_name={job_name}',
                f'--temp_location={temp_location}',
                f'--runner={runner}'
            ]
            
            # json file name
            file_name = "gs://my_core_bucket/data/my_data"

            # create pipeline 
            p = beam.Pipeline(argv=argv)

            # query to read table data
            query = f"SELECT id, name, address, phn FROM `{project}.{dataset}.{table}` LIMIT 4"

            bq_data = p | 'Read Table' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))

            # bq_data will be in the form 
            # {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
            # {"id": "2", "name": "def", "address": "Kolkata", "phn": "2222222222"}
            # {"id": "3", "name": "ghi", "address": "Chennai", "phn": "3333333333"}
            # {"id": "4", "name": "jkl", "address": "Delhi", "phn": "4444444444"}
            
            # alter data in the form needed for downstream process
            prepared_data = bq_data | beam.ParDo(PrepareData())

            # combine all the results in one PCollection
            # see https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally/
            prepared_data | 'Combine results' >> beam.CombineGlobally(ListCombineFn())

            # write formatted pcollection as JSON file. We will use a 
            # custom encoder for se serialization
            prepared_data | 'JSON format' >> beam.Map(json.dumps, cls=SetEncoder)
            prepared_data | 'Write Output' >> beam.io.WriteToText(file_name, file_name_suffix=".json", shard_name_template='')

            # execute pipeline
            p.run().wait_until_finish()
        except Exception as e:
            logging.error(f"Exception in run_pipe - {str(e)}")


if __name__ == "__main__":
    main_cls = MainClass()
    main_cls.run_pipe()

【讨论】:

  • 尝试了新的组合方法,但收到此错误“ TypeError: descriptor 'union' requires a 'set' object but received a 'dict'”
  • 嗨@GopinathS。我知道了。对不起,我无法测试你的实际代码,我用简单的测试数据进行了测试。请问,您可以尝试更新的解决方案吗?我创建了一个自定义合并函数来合并您的结果。它可能会起作用,但无论如何,我不知道它是否是最佳选择。无论如何,我希望它有所帮助。
  • - 我必须像这样更改我的管道以使“ListCombineFn”工作( bq_data | 'Prepare for push' >> beam.ParDo(PrepForPush(CONFIG)) | 'Combine results' >> beam.CombineGlobally(ListCombineFn()) | 'JSON format' >> beam.Map(json.dumps, cls=SetEncoder) | 'Write Output' >> beam.io.WriteToText(file_name, shard_name_template='') ) 这行仍然只使用dictsprepared_data | 创建JSON 文件。 '合并结果' >> beam.CombineGlobally(ListCombineFn())
  • 嗨@GopinathS。你的意思是管道可以简化?它当时有效,还是您仍然面临同样的错误?很抱歉 GopinathS,但正如我告诉你的,我无法用你的实际数据进行测试,我只是用简单的用例进行测试,如果答案没有我希望的准确,我深表歉意
  • 嗨@jccampanero - 它有效,我能够将JSON文件创建为元素数组。自定义组合功能似乎可以解决这个问题。
【解决方案2】:

您可以直接在 BigQuery 中执行此操作,然后使用 Dataflow 按原样打印结果。

只更改查询

query = f"Select ARRAY_AGG(str) from (SELECT struct(id as id, name as name, address as address, phn as phn) as str FROM `{project}.{dataset}.{table}` LIMIT 4)"

请记住

  • BigQuery 处理总是比数据流处理(或同等芯片上的其他处理)更快、更便宜
  • Dataflow 将始终构建有效的 JSON(您的 JSON 无效,您不能从数组开始)

【讨论】:

  • 我将在下游使用 JSON 文件,因此在我的情况下,以预期格式创建 JSON 文件是非常必要的。数据也很大,为了解释,我限制为 4 条记录。
  • 基本上,我想将整个 BigQuery 表转换为 JSON 文件,并在以后使用它们进行处理。
猜你喜欢
  • 2019-03-26
  • 1970-01-01
  • 1970-01-01
  • 2022-01-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多