【发布时间】:2021-06-06 23:00:01
【问题描述】:
背景:
我有 12,000 个 csv 文件 (50gb) 的数据,它们大多具有相同的格式,但有些可能缺少一两列,有些标题行可能并不总是从文件的第一行开始。
我有一个包含几个函数的类,这些函数利用 pandas 来分析和规范化存储在本地或来自谷歌存储桶的这些 csv 文件。
在这些函数中发生以下操作:
在analyze_files
- 遍历所有文件,“查看”其内容以确定标题以及是否需要跳过任何行才能到达标题行。
- 将所有收集的标头转换为标准格式,从文件名中删除除字母数字和下划线之外的所有标头。
在normalize_files
- 遍历所有文件,这次完全加载每个文件。
- 将列标题从
analyze_files转换为标头的standardizwd 版本。 - 上传或保存文件的更新版本
函数按预期工作。但是,我正在寻找可以用来加快速度的方法。
使用以下版本(简化为 mvce)和 12,000 个本地文件(8 核 16gb ram)
-
analyze_files大约需要 2-4 分钟 -
normalize_files大约需要 52 分钟
from google.cloud import storage
import pandas as pd
import glob
import os
import re
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./service_account_details.json"
class MyClass(object):
def __init__(self, uses_gs=False, gs_bucket_name=None, gs_folder_path=None):
self.__uses_gs = uses_gs
if uses_gs:
self.__gs_client = storage.Client()
self.__gs_bucket_name = gs_bucket_name
self.__gs_bucket = self.__gs_client.get_bucket(gs_bucket_name)
self.__gs_folder_path = gs_folder_path
else:
# save to a subfolder of current directory
self.__save_location = os.path.join(os.path.dirname(os.path.abspath(__file__)), self.__name__)
if not os.path.exists(self.__save_location):
os.mkdir(self.__save_location)
self.__file_analysis = dict()
self.__file_columns = set()
self.__file_column_mapping = dict()
def analyze_files(self):
# collect the list of files
files_to_analyze = list()
if self.__uses_gs:
gs_files = self.__gs_client.list_blobs(self.__gs_bucket, prefix=self.__gs_folder_path, delimiter="/")
for file in gs_files:
if file.name == self.__gs_folder_path:
continue
gs_filepath = f"gs://{self._gs_bucket_name}/{file.name}"
files_to_analyze.append(gs_filepath)
else:
local_files = glob.glob(os.path.join(self.__save_location, "*.csv"))
files_to_analyze.extend(local_files)
# analyze each collected file
for filepath in files_to_analyze:
# determine how many rows to skip in order to start at the header row,
# then collect the headers for this particular file, to be utilized for comparisons in `normalize_files`
skiprows = None
while True:
try:
df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
break
except pd.errors.ParserError as e:
try:
start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
skiprows = int(start_row_index) - 1
except IndexError:
print("Could not locate start_row_index in pandas ParserError message")
continue
headers = df.columns.values.tolist()
self.__file_columns.update(headers)
# store file details as pandas parameters, so we can smoothly transition into reading the files efficiently
skiprows = skiprows + 1 if skiprows else 1 # now that we know the headers, we can skip the header row
self.__file_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
# convert the columns to their bigquery-compliant equivalents
non_alpha = re.compile(r"([\s\W]|^\d+)")
multi_under = re.compile(r"(_{2,})")
self.__file_column_mapping.update({
file_column: multi_under.sub("_", non_alpha.sub("_", file_column)).upper()
for file_column in self.__file_columns
})
def normalize_files(self):
# perform the normalizations and upload/save the final results
total_columns = len(self.__file_columns)
for filepath, params in self.__file_analysis.items():
df = pd.read_csv(filepath, **params)
# rename the column header to align with bigquery columns
df.rename(columns=self.__file_column_mapping, inplace=True)
if len(params["names"]) != total_columns:
# swap the missing column names out for the bigquery equivalents
missing_columns = [self.__file_column_mapping[c] for c in self.__file_columns - set(params["names"])]
# add the missing columns to the dataframe
df[[*missing_columns]] = pd.DataFrame([[np.nan] * len(missing_columns)], index=df.index)
if self.__uses_gs:
blob_path = filepath[5 + len(self.__gs_bucket_name) + 1:] # "gs://" + "{bucket_name}" + "/"
self.__gs_bucket.blob(blob_path).upload_from_string(df.to_csv(index=False), "text/csv")
else: # save locally
df.to_csv(filepath, index=False)
我考虑过使用dask,结合multiprocessing 模块中的ProcessPool 和ThreadPool。但是,我正在为究竟采取什么方法而苦苦挣扎。
由于数据帧操作受 CPU 限制,它们似乎最适合 dask,可能与 ProcessPool 结合使用,将 12k 文件分配到 8 个可用内核中,然后 dask 将利用每个内核的线程核心(克服 GIL 限制)。
将文件上传回磁盘或谷歌存储桶似乎更适合ThreadPool,因为该活动是网络绑定的。
至于从 Google 存储桶中读取文件,我不确定哪种方法最有效。
基本上可以归结为两种风景:
- 在处理本地文件时,哪些方法/逻辑表现最佳?
- 在从 Google 存储桶中提取和保存(覆盖/更新)存储桶时,哪些方法/逻辑表现最佳?
有人可以提供一些可以为上述两个功能提供最有效速度提升的方向或代码吗?
我们将非常感谢基准测试,因为我在一周的大部分时间里一直在思考这个话题,如果有统计数据来支持方法的决定,那就太好了。
我尝试过的当前基准
def local_analysis_test_dir_pd(test_dir):
file_analysis, file_columns = dict(), set()
local_files = glob.glob(os.path.join(test_dir, "*.csv"))
for filepath in local_files:
skiprows = None
while True:
try:
df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
break
except pd.errors.ParserError as e:
try:
start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
skiprows = int(start_row_index) - 1
except IndexError:
print("Could not locate start_row_index in pandas ParserError message")
continue
headers = df.columns.values.tolist() # noqa
skiprows = skiprows + 1 if skiprows else 1
file_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
file_columns.update(headers)
non_alpha = re.compile(r"([\s\W]|^\d+)")
multi_under = re.compile(r"(_{2,})")
file_column_mapping = {
file_column: multi_under.sub(" ", non_alpha.sub("_", file_column)).upper()
for file_column in file_columns
}
# print dictionary length for sanity check; to ensure both functions are performing identical actions.
print("['local_analysis_test_dir_pd'] result:", len(file_analysis), len(file_columns))
return file_analysis, file_columns, file_column_mapping
def local_analysis_test_dir_dd(test_dir):
file_analysis, file_columns = dict(), set()
local_files = glob.glob(os.path.join(test_dir, "*.csv"))
def dask_worker(filepath):
siloed_analysis, siloed_columns = dict(), set()
skiprows = None
while True:
try:
df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
break
except pd.errors.ParserError as e:
try:
start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
skiprows = int(start_row_index) - 1
except IndexError:
print("Could not locate start_row_index in pandas ParserError message")
return siloed_analysis, siloed_columns
headers = df.columns.values.tolist()
siloed_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
siloed_columns.update(headers)
return siloed_analysis, siloed_columns
dask_futures = [dask.delayed(dask_worker)(filepath) for filepath in local_files]
file_analyses, column_sets = map(list, zip(*list(dask.compute(*dask_futures))))
for analysis in file_analyses:
file_analysis.update(analysis)
file_columns.update(*column_sets)
non_alpha = re.compile(r"([\s\W]|^\d+)")
multi_under = re.compile(r"(_{2,})")
file_column_mapping = {
file_column: multi_under.sub(" ", non_alpha.sub("_", file_column)).upper()
for file_column in file_columns
}
# print dictionary length for sanity check; to ensure both functions are performing identical actions.
print("['local_analysis_test_dir_dd'] result:", len(file_analysis), len(file_columns))
def remote_analysis_test_dir_pd(test_dir):
remote_files, file_analysis, file_columns = list(), dict(), set()
prefix = test_dir.replace("gs://webscraping/", "") + "/"
gs_files = gs_client.list_blobs("webscraping", prefix=prefix, delimiter="/")
for file in gs_files:
if file.name == prefix:
continue
elif file.name.endswith(".xlsx"):
continue
elif not file.name.endswith(".csv"):
continue
gs_filepath = f"gs://webscraping/{file.name}"
remote_files.append(gs_filepath)
for filepath in remote_files:
skiprows = None
while True:
try:
df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
break
except pd.errors.ParserError as e:
try:
start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
skiprows = int(start_row_index) - 1
except IndexError:
print("Could not locate start_row_index in pandas ParserError message")
continue
headers = df.columns.values.tolist() # noqa
skiprows = skiprows + 1 if skiprows else 1
file_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
file_columns.update(headers)
non_alpha = re.compile(r"([\s\W]|^\d+)")
multi_under = re.compile(r"(_{2,})")
file_column_mapping = {
file_column: multi_under.sub("_", non_alpha.sub("_", file_column)).upper()
for file_column in file_columns
}
# print dictionary length for sanity check; to ensure both functions are performing identical actions.
print("['remote_analysis_test_dir_pd'] result:", len(file_analysis), len(file_columns))
return file_analysis, file_columns, file_column_mapping
def remote_analysis_test_dir_dd(test_dir):
remote_files, file_analysis, file_columns = list(), dict(), set()
prefix = test_dir.replace("gs://webscraping/", "") + "/"
gs_files = gs_client.list_blobs("webscraping", prefix=prefix, delimiter="/")
for file in gs_files:
if file.name == prefix:
continue
elif file.name.endswith(".xlsx"):
continue
elif not file.name.endswith(".csv"):
continue
gs_filepath = f"gs://webscraping/{file.name}"
remote_files.append(gs_filepath)
def dask_worker(filepath):
siloed_analysis, siloed_columns = dict(), set()
skiprows = None
while True:
try:
df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
break
except pd.errors.ParserError as e:
try:
start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
skiprows = int(start_row_index) - 1
except IndexError:
print("Could not locate start_row_index in pandas ParserError message")
return siloed_analysis, siloed_columns
headers = df.columns.values.tolist()
siloed_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
siloed_columns.update(headers)
return siloed_analysis, siloed_columns
dask_futures = [dask.delayed(dask_worker)(filepath) for filepath in remote_files]
file_analyses, column_sets = map(list, zip(*list(dask.compute(*dask_futures))))
for analysis in file_analyses:
file_analysis.update(analysis)
file_columns.update(*column_sets)
non_alpha = re.compile(r"([\s\W]|^\d+)")
multi_under = re.compile(r"(_{2,})")
file_column_mapping = {
file_column: multi_under.sub("_", non_alpha.sub("_", file_column)).upper()
for file_column in file_columns
}
# print dictionary length for sanity check; to ensure both functions are performing identical actions.
print("['remote_analysis_test_dir_dd'] result:", len(file_analysis), len(file_columns))
return file_analysis, file_columns, file_column_mapping
def normalization_plain_with_pd(file_analysis, file_columns, file_column_mapping, meta_columns):
total_columns = len(file_columns)
for filepath, params in file_analysis.items():
df = pd.read_csv(filepath, **params)
# rename the column header to align with bigquery columns
df.rename(columns=file_column_mapping, inplace=True)
if len(params["names"]) != total_columns:
missing_columns = [file_column_mapping[c] for c in file_columns - set(params["names"])]
# add the missing columns to the dataframe
df[[*missing_columns]] = pd.DataFrame([[np.nan] * len(missing_columns)], index=df.index)
fpath, fname = os.path.split(filepath)
if not fpath.startswith("gs://"):
updated_path = os.path.join(fpath, "normalized_with_pd")
if not os.path.exists(updated_path):
os.mkdir(updated_path)
new_path = os.path.join(updated_path, fname)
else:
new_path = "/".join([fpath, "normalized_with_pd", fname])
df.to_csv(new_path, index=False)
def normalization_plain_with_dd(file_analysis, _file_columns, _file_column_mapping, _meta_columns):
def dask_worker(file_item, file_columns, file_column_mapping, meta_columns):
total_columns = len(file_columns)
filepath, params = file_item
df = pd.read_csv(filepath, **params)
# rename the column header to align with bigquery columns
df.rename(columns=file_column_mapping, inplace=True)
if len(params["names"]) != total_columns:
missing_columns = [file_column_mapping[c] for c in file_columns - set(params["names"])]
# add the missing columns to the dataframe
df[[*missing_columns]] = pd.DataFrame([[np.nan] * len(missing_columns)], index=df.index)
fpath, fname = os.path.split(filepath)
if not fpath.startswith("gs://"):
updated_path = os.path.join(fpath, "normalized_with_dd")
if not os.path.exists(updated_path):
os.mkdir(updated_path)
new_path = os.path.join(updated_path, fname)
else:
new_path = "/".join([fpath, "normalized_with_dd", fname])
df.to_csv(new_path, index=False)
dask_futures = [
dask.delayed(dask_worker)(file_item, _file_columns, _file_column_mapping, _meta_columns)
for file_item in file_analysis.items()
]
dask.compute(*dask_futures)
if __name__ == "__main__":
for size, params in local_dirs.items():
print(f"['{size}_local_analysis_dir_tests'] ({params['items']} files, {params['size']})")
local_analysis_test_dir_pd(params["directory"])
local_analysis_test_dir_dd(params["directory"])
for size, settings in local_dirs.items():
print(f"['{size}_pre_test_file_cleanup']")
for file in glob.glob(os.path.join(settings["directory"], '*', '*.csv')):
os.remove(file)
print(f"['{size}_local_normalization_dir_tests'] ({settings['items']} files, {settings['size']})")
files, columns, column_mapping = local_analysis_test_dir_pd(settings["directory"])
local_normalization_plain_with_pd(files, columns, column_mapping, {})
local_normalization_plain_with_dd(files, columns, column_mapping, {})
for size, settings in remote_dirs.items():
print(f"['{size}_remote_analysis_dir_tests'] ({settings['items']} files, {settings['size']})")
_, _, _ = remote_analysis_test_dir_pd(settings["directory"])
files, columns, column_mapping = remote_analysis_test_dir_dd(settings["directory"])
print(f"['{size}_remote_normalization_dir_tests'] ({settings['items']} files, {settings['size']})")
normalization_plain_with_pd(files, columns, column_mapping, {})
normalization_plain_with_dd(files, columns, column_mapping, {})
到目前为止的结论:
-
local_analysis最快,pandas.from_csv,基于:- 343 MB 的单个文件(使用
pandas为 0.0210 秒 VS 使用dask为 0.5141 秒) - 8 个文件/1.12 GB 的小目录(使用
pandas为 0.1263 秒,使用dask为 0.1357 秒) - 474 个文件/2.03 GB 的中等目录(使用
pandas为 3.2991 秒,使用dask为 3.7717 秒) - 13,361 个文件/46.30 GB 的 xlarge 目录(使用
pandas时为 131.5941 秒,使用dask时为 132.6982 秒)
- 343 MB 的单个文件(使用
-
local_normalization最快,pandas.from_csv,基于:- 8 个文件/1.12 GB 的小目录(使用
pandas为 61.2338 秒,使用dask为 62.2033 秒) - 474 个文件/2.03 GB 的中等目录(使用
pandas为 136.8900 秒 VS 使用dask为 132.7574 秒) - 13,361 个文件/46.30 GB 的超大目录(使用
pandas为 3166.0797 秒 VS 使用dask为 3265.4251 秒)
- 8 个文件/1.12 GB 的小目录(使用
-
remote_analysis最快,dask.delayed,基于:- 8 个文件/1.12 GB 的小目录(使用
pandas为 8.6728 秒 VS 使用dask为 6.0795 秒) - 474 个文件/2.03 GB 的中等目录(使用
pandas时为 149.7931 秒,使用dask时为 37.3509 秒)
- 8 个文件/1.12 GB 的小目录(使用
-
remote_normalization最快,dask.delayed,基于:- 8 个文件/1.12 GB 的小目录(使用
pandas时为 1758.1562 秒,使用dask时为 1431.9895 秒) - 中型和超大型数据集尚未进行基准测试
- 8 个文件/1.12 GB 的小目录(使用
-
注意:
dask测试在dask.delayed()调用中利用pandas.from_csv来最大程度地减少时间
【问题讨论】:
-
我认为这条线是你最大的瓶颈:
self.__gs_bucket.blob(blob_path).upload_from_string。网络操作真的很慢,特别是如果你必须上传 12k 的小文件。可以将规范化的帧写入一个大的临时文件并在最后上传吗?如果您需要将它们作为 12k 个单独的文件保存在 Google Big Blob Storage 上,请将它们压缩并上传 1 个。上传一个大文件总是比几个小文件快 -
我没有任何完善的建议...但是如果以效率为目标,您是否考虑过使用 Spark/PySpark 而不是 Dask/Python?我对 Google 的云环境不太熟悉,但在 AWS 中,我可能会通过将文件从 S3 读取到 EMR 集群来解决这个问题。
-
输入文件是否需要保持
.csv?我想知道切换到.parquet之类的东西是否会加快速度,因为您不必解析列/推断类型? -
我必须坚持使用 python,它们需要保持原来的格式 (
.csv)。请记住,我不仅在寻找有关减少上传到 Google 存储桶的时间的建议(我什至还没有对存储桶上传进行基准测试),而且我还在寻求利用一切可能的方法来减少实际处理的数据帧。 -
@CodeDifferent 不幸的是,文件无法合并。为了克服服务器空间限制,再加上任何需要查明特定文件及其原始格式结合文件的故障排除工作是不可能的。这就是为什么我一直在研究并行操作。
标签: python python-3.x pandas dataframe dask