【发布时间】:2020-11-09 05:59:00
【问题描述】:
总结:在将 pandas 数据帧附加到 BigQuery 时,types 不同,导致日常 ETL 流程出现问题。
我正在使用 Airflow 进行简单的 ETL:每天从 API 中提取数据,将原始数据备份到 Google Cloud Storage (GCS) 中的 JSON 文件中,然后将 GCS 中的数据附加到 BigQuery 数据库中。我对 ETL 的 extract 部分做得很好,调用 API 并将每个 API 调用的结果(将是数据库表中的一行)保存为 GCS 中它自己的 JSON 对象。那么对于 BigQuery 中具有 1K 行的表,我将首先创建/保存 1K 个单独的对象,这些对象保存到 GCS 的存储桶中,每个对象都是 API 调用的结果。
我现在正在努力处理 ETL 的 load 部分。到目前为止,我已经编写了以下脚本来完成从 GCS 到 BQ 的转移:
# load libraries, connect to google
from google.cloud import storage
import os
import gcsfs
import json
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/path/to/my/credentials'
# transfer data
def load_directory_to_bq():
# get list of filenames from GCS directory
client = storage.Client()
files = []
blobs = client.list_blobs('my-gcs-bucket', prefix='gcs-path-to-files')
for blob in blobs:
files.append(f'my-gcs-bucket/{blob.name}')
# approach A: This loop pulls json, converts into df, writes to BigQuery, each 1 file at a time
fs = gcsfs.GCSFileSystem() # GCP's Google Cloud Storage (GCS) File System (FS)
for file in files:
with fs.open(file, 'r') as f:
gcs_data = json.loads(f.read())
data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
this_df = pd.DataFrame(data)
pd.DataFrame.to_gbq(this_df, 'my-bq-tablename', project_id='my-gcp-project-id', if_exists='append')
# approach B: This loop loops all the files, creates 1 large dataframe, and does 1 large insert into BigQuery
output_df = pd.DataFrame()
fs = gcsfs.GCSFileSystem() # GCP's Google Cloud Storage (GCS) File System (FS)
for file in files:
with fs.open(file, 'r') as f:
gcs_data = json.loads(f.read())
data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
this_df = pd.DataFrame(data)
output_df = output_df.append(this_df)
pd.DataFrame.to_gbq(output_df, 'my-bq-tablename', project_id='my-gcp-project-id', if_exists='append')
GCS 中的 1K 对象都是相似的,但并不总是具有完全相同的结构:
- 几乎所有相同的键
- 每个键的“类型”几乎总是相同
但是,对于某些 JSON 对象,“类型”可以是不同的,对于相同的键,不同的对象。当作为 1 行 pandas 数据框加载到 python 中时,相同的键 key1 可能是 float 或 integer,具体取决于值。此外,有时对象中缺少键,或者其值/属性为null,这会弄乱“类型”并在使用to_gbq 函数时导致问题。
使用上面的方法A,第一次对象/pandas DF 具有不同的类型时,会引发以下错误:Please verify that the structure and data types in the DataFrame match the schema of the destination table. 方法A 似乎也效率低下,因为它调用to_gbq 1K 行中的每一行,每次调用需要 2-3 秒。
通过 B 方法,似乎解决了不同的“类型”问题,因为 pandas 在其 append 函数中处理不同的“类型”以将 2 个数据帧附加在一起。结果,我得到了 1 个数据帧,并且可以将其附加到 BigQuery。但是,我仍然担心将来可能需要附加的新数据与现有表中已有的类型不匹配。毕竟,我不是在 BigQuery 中查询旧表、追加到新数据,然后重新创建表。我只是追加新行,我担心其中一个键具有不同“类型”的表会导致错误并破坏我的管道。
理论上,A 方法很好,因为可以处理使用to_gbq 附加到表中的任何单个行而没有错误的方法很好。但它需要确保每一行都有相同的键/类型。使用方法B,我认为python 将不同的类型自动合并为表的一种类型并不好,因为这似乎会导致新数据出现问题。
我正在考虑这里最好的方法是什么。由于两者都是谷歌产品,从 GCS 到 BQ 应该很简单,但不完善的数据会使它稍微困难一些。特别是,我是否应该在某处为每个不同的 BQ 表定义一个显式表模式,并编写一个 python 函数来确保正确的类型/将错误的类型转换为正确的类型?我应该每次都在 BQ 中重新创建表吗?我应该完全避免使用 Python 并以其他方式从 GCS 转移到 BQ 吗?
【问题讨论】:
-
你会经常执行这个过程吗?如果你不是唯一关心的应该是架构更新
-
此过程将每天运行
-
您每天每次 API 调用都会在 GCS 上创建 1000 个对象吗?为什么决定单独保存每个 JSON 对象?或者您目前需要迁移这 1000 个对象但普通流程会有所不同?
-
另外,GCS 在您的 ETL 流程中的作用是什么?为什么需要 API 作为数据源和 GBQ 作为目标存储之间的这种中间区别?
-
@EgorBEremeev - GCS 是 (a) 对原始数据进行备份,(b) 如果 BQ 或其他地方出现问题,则作为原始数据的真实来源,(c)以防止在上传到 BQ 之前出现问题时必须进行两次相同的 API 调用
标签: python pandas google-bigquery etl airflow