【问题标题】:Pulling JSONs from Google Cloud Storage, converting into pandas DF, and writing to Google BigQuery从 Google Cloud Storage 中提取 JSON,转换为 pandas DF,并写入 Google BigQuery
【发布时间】:2020-11-09 05:59:00
【问题描述】:

总结:在将 pandas 数据帧附加到 BigQuery 时,types 不同,导致日常 ETL 流程出现问题。

我正在使用 Airflow 进行简单的 ETL:每天从 API 中提取数据,将原始数据备份到 Google Cloud Storage (GCS) 中的 JSON 文件中,然后将 GCS 中的数据附加到 BigQuery 数据库中。我对 ETL 的 extract 部分做得很好,调用 API 并将每个 API 调用的结果(将是数据库表中的一行)保存为 GCS 中它自己的 JSON 对象。那么对于 BigQuery 中具有 1K 行的表,我将首先创建/保存 1K 个单独的对象,这些对象保存到 GCS 的存储桶中,每个对象都是 API 调用的结果。

我现在正在努力处理 ETL 的 load 部分。到目前为止,我已经编写了以下脚本来完成从 GCSBQ 的转移:

# load libraries, connect to google
from google.cloud import storage
import os
import gcsfs
import json
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/path/to/my/credentials'

# transfer data
def load_directory_to_bq():

    # get list of filenames from GCS directory
    client = storage.Client()
    files = []
    blobs = client.list_blobs('my-gcs-bucket', prefix='gcs-path-to-files')
    for blob in blobs:
        files.append(f'my-gcs-bucket/{blob.name}')
    

    # approach A: This loop pulls json, converts into df, writes to BigQuery, each 1 file at a time
    fs = gcsfs.GCSFileSystem() # GCP's Google Cloud Storage (GCS) File System (FS)
    for file in files:
        with fs.open(file, 'r') as f:
            gcs_data = json.loads(f.read())
            data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
            this_df = pd.DataFrame(data)
            pd.DataFrame.to_gbq(this_df, 'my-bq-tablename', project_id='my-gcp-project-id', if_exists='append')


    # approach B: This loop loops all the files, creates 1 large dataframe, and does 1 large insert into BigQuery
    output_df = pd.DataFrame()
    fs = gcsfs.GCSFileSystem() # GCP's Google Cloud Storage (GCS) File System (FS)
    for file in files:
        with fs.open(file, 'r') as f:
            gcs_data = json.loads(f.read())
            data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
            this_df = pd.DataFrame(data)
            output_df = output_df.append(this_df)

    pd.DataFrame.to_gbq(output_df, 'my-bq-tablename', project_id='my-gcp-project-id', if_exists='append')

GCS 中的 1K 对象都是相似的,但并不总是具有完全相同的结构:

  • 几乎所有相同的键
  • 每个键的“类型”几乎总是相同

但是,对于某些 JSON 对象,“类型”可以是不同的,对于相同的键,不同的对象。当作为 1 行 pandas 数据框加载到 python 中时,相同的键 key1 可能是 floatinteger,具体取决于值。此外,有时对象中缺少键,或者其值/属性为null,这会弄乱“类型”并在使用to_gbq 函数时导致问题。

使用上面的方法A,第一次对象/pandas DF 具有不同的类型时,会引发以下错误:Please verify that the structure and data types in the DataFrame match the schema of the destination table. 方法A 似乎也效率低下,因为它调用to_gbq 1K 行中的每一行,每次调用需要 2-3 秒。

通过 B 方法,似乎解决了不同的“类型”问题,因为 pandas 在其 append 函数中处理不同的“类型”以将 2 个数据帧附加在一起。结果,我得到了 1 个数据帧,并且可以将其附加到 BigQuery。但是,我仍然担心将来可能需要附加的新数据与现有表中已有的类型不匹配。毕竟,我不是在 BigQuery 中查询旧表、追加到新数据,然后重新创建表。我只是追加新行,我担心其中一个键具有不同“类型”的表会导致错误并破坏我的管道。

理论上,A 方法很好,因为可以处理使用to_gbq 附加到表中的任何单个行而没有错误的方法很好。但它需要确保每一行都有相同的键/类型。使用方法B,我认为python 将不同的类型自动合并为表的一种类型并不好,因为这似乎会导致新数据出现问题。

我正在考虑这里最好的方法是什么。由于两者都是谷歌产品,从 GCS 到 BQ 应该很简单,但不完善的数据会使它稍微困难一些。特别是,我是否应该在某处为每个不同的 BQ 表定义一个显式表模式,并编写一个 python 函数来确保正确的类型/将错误的类型转换为正确的类型?我应该每次都在 BQ 中重新创建表吗?我应该完全避免使用 Python 并以其他方式从 GCS 转移到 BQ 吗?

【问题讨论】:

  • 你会经常执行这个过程吗?如果你不是唯一关心的应该是架构更新
  • 此过程将每天运行
  • 您每天每次 API 调用都会在 GCS 上创建 1000 个对象吗?为什么决定单独保存每个 JSON 对象?或者您目前需要迁移这 1000 个对象但普通流程会有所不同?
  • 另外,GCS 在您的 ETL 流程中的作用是什么?为什么需要 API 作为数据源和 GBQ 作为目标存储之间的这种中间区别?
  • @EgorBEremeev - GCS 是 (a) 对原始数据进行备份,(b) 如果 BQ 或其他地方出现问题,则作为原始数据的真实来源,(c)以防止在上传到 BQ 之前出现问题时必须进行两次相同的 API 调用

标签: python pandas google-bigquery etl airflow


【解决方案1】:

关于你的方法 A 和 B,我有以下考虑:

  1. 如果请求很慢并且行数很大,那么方法 B 肯定会更快。
  2. 我不知道您的数据量,但请记住,如果您有大量数据,则必须注意机器容量,以避免性能不佳和错误。
  3. 如果您的流程每天只执行一次,那么将所有数据插入表中所花费的时间可能根本不是问题。
  4. 如您所说,方法 B 可以避免架构问题,但不能保证。

鉴于此,我想提出以下措施。

  1. 对于文件中可能丢失信息(或可能为 NULL)的键,请将 BigQuery 表中的相应字段设置为 NULLABLE
  2. 使用方法 A 或 B,通过使用转换 Dataframe 列的某些函数确保 Dataframe 具有正确的类型。您可以更改 Dataframe 列的类型,例如 df.astype({"key1": float, "key2": int, [...]}),您可以在此 reference 中找到。

【讨论】:

  • 这很有帮助。并且使用方法 A 从来都不是我会使用的真正方法 - 它只是暴露了当 .append() 数据帧在一起时 pandas 正在处理的type 问题。我将创建一个函数,将每个数据帧转换为 BigQuery 的正确架构。这似乎很聪明。谢谢。
【解决方案2】:

嗯,实际上你问的是 ETL 中的转换阶段,因为加载显然是由你已经使用的 pandas.DataFrame.to_gbq() 方法完成的。

让我们来看看你描述的整个 ETL 流:

来源:API -> GCS -> Pandas DataFrame -> 目的地:GBQ

注意:

  • 您在 API 和 GCS 之间执行哪些数据转换?

然而,实际上,这里有 2 个 ETL 流:

  1. 来源:API -> ?? -> 目的地:GCS(JSON 对象)
  2. 来源:GCS(JSON 对象)-> Pandas DataFrame -> 目的地:GBQ(表格)

实际上,数据格式变化的根本原因来自您的 API,因为它返回 JSON 作为响应。因为 JSON 是无模式对象。 自然地,这种格式变化会传播到您的 GCS 对象中。 另一方面,作为目的地,您有 GBQ 表,该表从创建那一刻起就具有严格的架构和cannot be altered after

因此,为了有效地将来自 REST API 的数据加载到 GBQ,您可以遵循以下想法:

  1. JSON 是一种嵌套的数据结构,而表是扁平的。所以任务是将第一个转换为第二个。

  2. 通过检查您的 API 响应对象并定义来解决这个问题

    • 可以标准化为平面表模式的最广泛的可能字段集。就像,所有可选字段都会同时出现。
    • 您的 JSON 中的一个数组,它们是自复杂的对象,您非常需要它来提取和加载。与他们一起完成第 1 步。
  3. 拥有这样的平面架构理解计划创建具有所有 NULLABLE 字段的 GBQ 表(每个您将实际提取的对象的单独一个)。

  4. 如果你使用 Pandas DataFrame 进行转换,那么:

    • 明确定义列的数据类型。这可以避免在推断 pandas dtype 取决于即将到来的数据时出现问题。注意这里pandas-gbq documentation
    • 数组自然会转换为 DataFrame,然后您将在一次 GBQ API 调用中加载所有记录。

此外,您可以重新考虑 ETL 流。

目前,您说过,GCS 的作用是:

  • (a) 备份原始数据
  • (b) 如果 BQ 或其他地方出现问题,则作为原始数据的真实来源
  • (c) 防止在上传到 BQ 之前出现问题时必须进行两次相同的 API 调用

当您将数据同时加载到 GCS 和 GBQ 时,所有这些都可以实现。但是您可以通过一个常见的转换阶段来做到这一点。

Source: API -> Pandas DataFrame
    1. |-> Destination: GBQ (table)
    2. |-> Destination: GCS (objects)

您可以通过以下方式使用 Pandas DataFrame 执行转换阶段:

  1. 将 JSON 对象嵌套到平面表(DataFrame)中:

    df = pd.json_normalize(api_response_json_object, 'api_response_nested_json_object', sep='_')
    
  2. 强制字段数据类型:

    def force_df_schema(df, columns_list, columns_dtypes):
        df = df.reindex(columns_list, axis="columns")
        df = df.astype(columns_dtypes)
        return df
    
    API_TRANSACTION_OBJECT_COLUMNS = ['c1', 'c2', 'c3', 'c4']
    API_TRANSACTION_OBJECT_COLUMNS_DTYPES = {
        'c1': 'object',
        'c2': 'datetime64[ns]',
        'c3': 'float64',
        'c4': 'int'
    }
    
    # Let's this call will returns JSON with, for example,
    # {transaction} nested structure, which we need to extract, transform and load 
    api_response_json_object = api.call()
    
    df = pd.json_normalize(api_response_json_object, 
                           'api_response_nested_json_object', sep='_')
    
    df = force_df_schema(df, API_TRANSACTION_OBJECT_COLUMNS,
                             API_TRANSACTION_OBJECT_COLUMNS_DTYPES)
    
  3. 加载到目标存储:

像你已经做的那样去GBQ

 ```
 pd.DataFrame.to_gbq(df, 'bq-tablename', project_id='gcp-project-id', if_exists='append') 
 #also this can create the initial GBQ table,
 #types will be inffered as mentioned in the pandas-bgq docs above.
 ```

也像你已经做的那样去 GCS。

【讨论】:

    猜你喜欢
    • 2016-07-18
    • 2020-01-19
    • 1970-01-01
    • 2019-02-14
    • 2019-11-23
    • 1970-01-01
    • 2018-01-28
    • 2023-04-08
    • 1970-01-01
    相关资源
    最近更新 更多