【问题标题】:How can I speed up these dataframe operations on 12k files/50gb?如何加快 12k 文件/50gb 上的这些数据帧操作?
【发布时间】:2021-06-06 23:00:01
【问题描述】:

背景:

我有 12,000 个 csv 文件 (50gb) 的数据,它们大多具有相同的格式,但有些可能缺少一两列,有些标题行可能并不总是从文件的第一行开始。

我有一个包含几个函数的类,这些函数利用 pandas 来分析和规范化存储在本地或来自谷歌存储桶的这些 csv 文件。

在这些函数中发生以下操作:

analyze_files

  • 遍历所有文件,“查看”其内容以确定标题以及是否需要跳过任何行才能到达标题行。
  • 将所有收集的标头转换为标准格式,从文件名中删除除字母数字和下划线之外的所有标头。

normalize_files

  • 遍历所有文件,这次完全加载每个文件。
  • 将列标题从analyze_files 转换为标头的standardizwd 版本。
  • 上传或保存文件的更新版本

函数按预期工作。但是,我正在寻找可以用来加快速度的方法。
使用以下版本(简化为 mvce)和 12,000 个本地文件(8 核 16gb ram)

  • analyze_files 大约需要 2-4 分钟
  • normalize_files 大约需要 52 分钟
from google.cloud import storage
import pandas as pd
import glob
import os
import re

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./service_account_details.json"


class MyClass(object):
    def __init__(self, uses_gs=False, gs_bucket_name=None, gs_folder_path=None):
        self.__uses_gs = uses_gs
        if uses_gs:
            self.__gs_client = storage.Client()
            self.__gs_bucket_name = gs_bucket_name
            self.__gs_bucket = self.__gs_client.get_bucket(gs_bucket_name)
            self.__gs_folder_path = gs_folder_path
        else:
            # save to a subfolder of current directory
            self.__save_location = os.path.join(os.path.dirname(os.path.abspath(__file__)), self.__name__)
            if not os.path.exists(self.__save_location):
                os.mkdir(self.__save_location)
        self.__file_analysis = dict()
        self.__file_columns = set()
        self.__file_column_mapping = dict()
    
    def analyze_files(self):
        # collect the list of files
        files_to_analyze = list()
        if self.__uses_gs:
            gs_files = self.__gs_client.list_blobs(self.__gs_bucket, prefix=self.__gs_folder_path, delimiter="/")
            for file in gs_files:
                if file.name == self.__gs_folder_path:
                    continue
                gs_filepath = f"gs://{self._gs_bucket_name}/{file.name}"
                files_to_analyze.append(gs_filepath)
        else:
            local_files = glob.glob(os.path.join(self.__save_location, "*.csv"))
            files_to_analyze.extend(local_files)
                
        # analyze each collected file
        for filepath in files_to_analyze:
            # determine how many rows to skip in order to start at the header row,
            # then collect the headers for this particular file, to be utilized for comparisons in `normalize_files`
            skiprows = None
            while True:
                try:
                    df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
                    break
                except pd.errors.ParserError as e:
                    try:
                        start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
                        skiprows = int(start_row_index) - 1
                    except IndexError:
                        print("Could not locate start_row_index in pandas ParserError message")
                        continue
            headers = df.columns.values.tolist()
            self.__file_columns.update(headers)
            # store file details as pandas parameters, so we can smoothly transition into reading the files efficiently
            skiprows = skiprows + 1 if skiprows else 1  # now that we know the headers, we can skip the header row
            self.__file_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
            
        # convert the columns to their bigquery-compliant equivalents
        non_alpha = re.compile(r"([\s\W]|^\d+)")
        multi_under = re.compile(r"(_{2,})")
        self.__file_column_mapping.update({
            file_column: multi_under.sub("_", non_alpha.sub("_", file_column)).upper()
            for file_column in self.__file_columns
        })

    def normalize_files(self):
        # perform the normalizations and upload/save the final results
        total_columns = len(self.__file_columns)
        for filepath, params in self.__file_analysis.items():
            df = pd.read_csv(filepath, **params)
            # rename the column header to align with bigquery columns
            df.rename(columns=self.__file_column_mapping, inplace=True)
 
            if len(params["names"]) != total_columns:
                # swap the missing column names out for the bigquery equivalents
                missing_columns = [self.__file_column_mapping[c] for c in self.__file_columns - set(params["names"])]
                # add the missing columns to the dataframe
                df[[*missing_columns]] = pd.DataFrame([[np.nan] * len(missing_columns)], index=df.index)
 
            if self.__uses_gs:
                blob_path = filepath[5 + len(self.__gs_bucket_name) + 1:]  # "gs://" + "{bucket_name}" + "/"
                self.__gs_bucket.blob(blob_path).upload_from_string(df.to_csv(index=False), "text/csv")
            else:  # save locally
                df.to_csv(filepath, index=False)

我考虑过使用dask,结合multiprocessing 模块中的ProcessPoolThreadPool。但是,我正在为究竟采取什么方法而苦苦挣扎。

由于数据帧操作受 CPU 限制,它们似乎最适合 dask,可能与 ProcessPool 结合使用,将 12k 文件分配到 8 个可用内核中,然后 dask 将利用每个内核的线程核心(克服 GIL 限制)。

将文件上传回磁盘或谷歌存储桶似乎更适合ThreadPool,因为该活动是网络绑定的。

至于从 Google 存储桶中读取文件,我不确定哪种方法最有效。

基本上可以归结为两种风景:

  1. 在处理本地文件时,哪些方法/逻辑表现最佳?
  2. 在从 Google 存储桶中提取和保存(覆盖/更新)存储桶时,哪些方法/逻辑表现最佳?

有人可以提供一些可以为上述两个功能提供最有效速度提升的方向或代码吗?

我们将非常感谢基准测试,因为我在一周的大部分时间里一直在思考这个话题,如果有统计数据来支持方法的决定,那就太好了。

我尝试过的当前基准

def local_analysis_test_dir_pd(test_dir):
    file_analysis, file_columns = dict(), set()
    local_files = glob.glob(os.path.join(test_dir, "*.csv"))
    for filepath in local_files:
        skiprows = None
        while True:
            try:
                df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
                break
            except pd.errors.ParserError as e:
                try:
                    start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
                    skiprows = int(start_row_index) - 1
                except IndexError:
                    print("Could not locate start_row_index in pandas ParserError message")
                    continue
        headers = df.columns.values.tolist()  # noqa
        skiprows = skiprows + 1 if skiprows else 1
        file_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
        file_columns.update(headers)

    non_alpha = re.compile(r"([\s\W]|^\d+)")
    multi_under = re.compile(r"(_{2,})")
    file_column_mapping = {
        file_column: multi_under.sub(" ", non_alpha.sub("_", file_column)).upper()
        for file_column in file_columns
    }
    # print dictionary length for sanity check; to ensure both functions are performing identical actions.
    print("['local_analysis_test_dir_pd'] result:", len(file_analysis), len(file_columns))
    return file_analysis, file_columns, file_column_mapping

def local_analysis_test_dir_dd(test_dir):
    file_analysis, file_columns = dict(), set()
    local_files = glob.glob(os.path.join(test_dir, "*.csv"))
    
    def dask_worker(filepath):
        siloed_analysis, siloed_columns = dict(), set()
        skiprows = None
        while True:
            try:
                df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
                break
            except pd.errors.ParserError as e:
                try:
                    start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
                    skiprows = int(start_row_index) - 1
                except IndexError:
                    print("Could not locate start_row_index in pandas ParserError message")
                    return siloed_analysis, siloed_columns
        headers = df.columns.values.tolist()
        siloed_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
        siloed_columns.update(headers)
        return siloed_analysis, siloed_columns
    
    dask_futures = [dask.delayed(dask_worker)(filepath) for filepath in local_files]
    file_analyses, column_sets = map(list, zip(*list(dask.compute(*dask_futures))))
    for analysis in file_analyses:
        file_analysis.update(analysis)
    file_columns.update(*column_sets)
    non_alpha = re.compile(r"([\s\W]|^\d+)")
    multi_under = re.compile(r"(_{2,})")
    file_column_mapping = {
        file_column: multi_under.sub(" ", non_alpha.sub("_", file_column)).upper()
        for file_column in file_columns
    }
    # print dictionary length for sanity check; to ensure both functions are performing identical actions.
    print("['local_analysis_test_dir_dd'] result:", len(file_analysis), len(file_columns))

def remote_analysis_test_dir_pd(test_dir):
    remote_files, file_analysis, file_columns = list(), dict(), set()
    prefix = test_dir.replace("gs://webscraping/", "") + "/"
    gs_files = gs_client.list_blobs("webscraping", prefix=prefix, delimiter="/")
    for file in gs_files:
        if file.name == prefix:
            continue
        elif file.name.endswith(".xlsx"):
            continue
        elif not file.name.endswith(".csv"):
            continue
        gs_filepath = f"gs://webscraping/{file.name}"
        remote_files.append(gs_filepath)

    for filepath in remote_files:
        skiprows = None
        while True:
            try:
                df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
                break
            except pd.errors.ParserError as e:
                try:
                    start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
                    skiprows = int(start_row_index) - 1
                except IndexError:
                    print("Could not locate start_row_index in pandas ParserError message")
                    continue
        headers = df.columns.values.tolist()  # noqa
        skiprows = skiprows + 1 if skiprows else 1
        file_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
        file_columns.update(headers)

    non_alpha = re.compile(r"([\s\W]|^\d+)")
    multi_under = re.compile(r"(_{2,})")
    file_column_mapping = {
        file_column: multi_under.sub("_", non_alpha.sub("_", file_column)).upper()
        for file_column in file_columns
    }
    # print dictionary length for sanity check; to ensure both functions are performing identical actions.
    print("['remote_analysis_test_dir_pd'] result:", len(file_analysis), len(file_columns))
    return file_analysis, file_columns, file_column_mapping

def remote_analysis_test_dir_dd(test_dir):
    remote_files, file_analysis, file_columns = list(), dict(), set()
    prefix = test_dir.replace("gs://webscraping/", "") + "/"
    gs_files = gs_client.list_blobs("webscraping", prefix=prefix, delimiter="/")
    for file in gs_files:
        if file.name == prefix:
            continue
        elif file.name.endswith(".xlsx"):
            continue
        elif not file.name.endswith(".csv"):
            continue
        gs_filepath = f"gs://webscraping/{file.name}"
        remote_files.append(gs_filepath)

    def dask_worker(filepath):
        siloed_analysis, siloed_columns = dict(), set()
        skiprows = None
        while True:
            try:
                df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
                break
            except pd.errors.ParserError as e:
                try:
                    start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
                    skiprows = int(start_row_index) - 1
                except IndexError:
                    print("Could not locate start_row_index in pandas ParserError message")
                    return siloed_analysis, siloed_columns
        headers = df.columns.values.tolist()
        siloed_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
        siloed_columns.update(headers)
        return siloed_analysis, siloed_columns

    dask_futures = [dask.delayed(dask_worker)(filepath) for filepath in remote_files]
    file_analyses, column_sets = map(list, zip(*list(dask.compute(*dask_futures))))
    for analysis in file_analyses:
        file_analysis.update(analysis)
    file_columns.update(*column_sets)
    non_alpha = re.compile(r"([\s\W]|^\d+)")
    multi_under = re.compile(r"(_{2,})")
    file_column_mapping = {
        file_column: multi_under.sub("_", non_alpha.sub("_", file_column)).upper()
        for file_column in file_columns
    }
    # print dictionary length for sanity check; to ensure both functions are performing identical actions.
    print("['remote_analysis_test_dir_dd'] result:", len(file_analysis), len(file_columns))
    return file_analysis, file_columns, file_column_mapping

def normalization_plain_with_pd(file_analysis, file_columns, file_column_mapping, meta_columns):
    total_columns = len(file_columns)
    for filepath, params in file_analysis.items():
        df = pd.read_csv(filepath, **params)
        # rename the column header to align with bigquery columns
        df.rename(columns=file_column_mapping, inplace=True)
        if len(params["names"]) != total_columns:
            missing_columns = [file_column_mapping[c] for c in file_columns - set(params["names"])]
            # add the missing columns to the dataframe
            df[[*missing_columns]] = pd.DataFrame([[np.nan] * len(missing_columns)], index=df.index)
        fpath, fname = os.path.split(filepath)
        if not fpath.startswith("gs://"):
            updated_path = os.path.join(fpath, "normalized_with_pd")
            if not os.path.exists(updated_path):
                os.mkdir(updated_path)
            new_path = os.path.join(updated_path, fname)
        else:
            new_path = "/".join([fpath, "normalized_with_pd", fname])
        df.to_csv(new_path, index=False)

def normalization_plain_with_dd(file_analysis, _file_columns, _file_column_mapping, _meta_columns):
    def dask_worker(file_item, file_columns, file_column_mapping, meta_columns):
        total_columns = len(file_columns)
        filepath, params = file_item
        df = pd.read_csv(filepath, **params)
        # rename the column header to align with bigquery columns
        df.rename(columns=file_column_mapping, inplace=True)
        if len(params["names"]) != total_columns:
            missing_columns = [file_column_mapping[c] for c in file_columns - set(params["names"])]
            # add the missing columns to the dataframe
            df[[*missing_columns]] = pd.DataFrame([[np.nan] * len(missing_columns)], index=df.index)
        fpath, fname = os.path.split(filepath)
        if not fpath.startswith("gs://"):
            updated_path = os.path.join(fpath, "normalized_with_dd")
            if not os.path.exists(updated_path):
                os.mkdir(updated_path)
            new_path = os.path.join(updated_path, fname)
        else:
            new_path = "/".join([fpath, "normalized_with_dd", fname])
        df.to_csv(new_path, index=False)
    dask_futures = [
        dask.delayed(dask_worker)(file_item, _file_columns, _file_column_mapping, _meta_columns)
        for file_item in file_analysis.items()
    ]
    dask.compute(*dask_futures)

if __name__ == "__main__":
    for size, params in local_dirs.items():
        print(f"['{size}_local_analysis_dir_tests'] ({params['items']} files, {params['size']})")
        local_analysis_test_dir_pd(params["directory"])
        local_analysis_test_dir_dd(params["directory"])

    for size, settings in local_dirs.items():
        print(f"['{size}_pre_test_file_cleanup']")
        for file in glob.glob(os.path.join(settings["directory"], '*', '*.csv')):
            os.remove(file)
        print(f"['{size}_local_normalization_dir_tests'] ({settings['items']} files, {settings['size']})")
        files, columns, column_mapping = local_analysis_test_dir_pd(settings["directory"])

        local_normalization_plain_with_pd(files, columns, column_mapping, {})
        local_normalization_plain_with_dd(files, columns, column_mapping, {})

    for size, settings in remote_dirs.items():
        print(f"['{size}_remote_analysis_dir_tests'] ({settings['items']} files, {settings['size']})")
        _, _, _ = remote_analysis_test_dir_pd(settings["directory"])
        files, columns, column_mapping = remote_analysis_test_dir_dd(settings["directory"])

        print(f"['{size}_remote_normalization_dir_tests'] ({settings['items']} files, {settings['size']})")
        normalization_plain_with_pd(files, columns, column_mapping, {})
        normalization_plain_with_dd(files, columns, column_mapping, {})

到目前为止的结论:

  • local_analysis 最快,pandas.from_csv,基于:

    • 343 MB 的单个文件(使用 pandas 为 0.0210 秒 VS 使用 dask 为 0.5141 秒)
    • 8 个文件/1.12 GB 的小目录(使用 pandas 为 0.1263 秒,使用 dask 为 0.1357 秒)
    • 474 个文件/2.03 GB 的中等目录(使用 pandas 为 3.2991 秒,使用 dask 为 3.7717 秒)
    • 13,361 个文件/46.30 GB 的 xlarge 目录(使用 pandas 时为 131.5941 秒,使用 dask 时为 132.6982 秒)
  • local_normalization 最快,pandas.from_csv,基于:

    • 8 个文件/1.12 GB 的小目录(使用 pandas 为 61.2338 秒,使用 dask 为 62.2033 秒)
    • 474 个文件/2.03 GB 的中等目录(使用 pandas 为 136.8900 秒 VS 使用 dask 为 132.7574 秒)
    • 13,361 个文件/46.30 GB 的超大目录(使用 pandas 为 3166.0797 秒 VS 使用 dask 为 3265.4251 秒)
  • remote_analysis 最快,dask.delayed,基于:

    • 8 个文件/1.12 GB 的小目录(使用 pandas 为 8.6728 秒 VS 使用 dask 为 6.0795 秒)
    • 474 个文件/2.03 GB 的中等目录(使用 pandas 时为 149.7931 秒,使用 dask 时为 37.3509 秒)
  • remote_normalization 最快,dask.delayed,基于:

    • 8 个文件/1.12 GB 的小目录(使用 pandas 时为 1758.1562 秒,使用 dask 时为 1431.9895 秒)
    • 中型和超大型数据集尚未进行基准测试
  • 注意: dask 测试在 dask.delayed() 调用中利用 pandas.from_csv 来最大程度地减少时间

【问题讨论】:

  • 我认为这条线是你最大的瓶颈:self.__gs_bucket.blob(blob_path).upload_from_string。网络操作真的很慢,特别是如果你必须上传 12k 的小文件。可以将规范化的帧写入一个大的临时文件并在最后上传吗?如果您需要将它们作为 12k 个单独的文件保存在 Google Big Blob Storage 上,请将它们压缩并上传 1 个。上传一个大文件总是比几个小文件快
  • 我没有任何完善的建议...但是如果以效率为目标,您是否考虑过使用 Spark/PySpark 而不是 Dask/Python?我对 Google 的云环境不太熟悉,但在 AWS 中,我可能会通过将文件从 S3 读取到 EMR 集群来解决这个问题。
  • 输入文件是否需要保持.csv?我想知道切换到 .parquet 之类的东西是否会加快速度,因为您不必解析列/推断类型?
  • 我必须坚持使用 python,它们需要保持原来的格式 (.csv)。请记住,我不仅在寻找有关减少上传到 Google 存储桶的时间的建议(我什至还没有对存储桶上传进行基准测试),而且我还在寻求利用一切可能的方法来减少实际处理的数据帧。
  • @CodeDifferent 不幸的是,文件无法合并。为了克服服务器空间限制,再加上任何需要查明特定文件及其原始格式结合文件的故障排除工作是不可能的。这就是为什么我一直在研究并行操作。

标签: python python-3.x pandas dataframe dask


【解决方案1】:

就像 Code Different 所说的那样,upload_from_string 位需要一段时间。您是否考虑过将它们写入 Google BigQuery,而不是将它们保存为存储桶中的 .csv 文件?我发现这对我的目的来说更快。

【讨论】:

  • 它们最终会在稍后被摄取到 bigquery 中,但需要先上传到 Google 存储桶。虽然我认识到upload_from_string 是使用该类的 Google 存储选项时的瓶颈,但我仍然需要尽可能长时间地进行本地处理。这就是为什么我希望有人可以利用我提到的三个部分的组合提供解决方案:daskThreadPools 和ProcessPools。使用这些至少应将保存/上传时间减少 8 倍 [核心](网络允许)。
【解决方案2】:

delayed API 可能适合这里。您提供的课程相当详尽,但这是可能适用于这种情况的粗略模式:

import dask

@dask.delayed
def analyze_one_file(file_name):
    # use the code you run on a single file here
    return dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))

# form delayed computations
delayed_values = [analyze_one_file(filepath) for filepath in files_to_analyze]

# execute the delayed computations
results = dask.compute(delayed_values)

# now results will be a list of dictionaries (or whatever
# the delayed function returns)

# apply similar wrapping to normalize_files loop

对于您的情况,可能有一个更有效的 ETL 过程,但这是特定于情况的,因此假设有必要遍历文件以发现要跳过的行数,然后用 delayed 包装起来可能足以将 df 处理时间减少核心倍数。

【讨论】:

  • 不幸的是,我已经对这种方法进行了基准测试,它相当于使用普通的pandas 方法。我正在使用我使用的当前基准和功能更新我的问题。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-11-28
  • 1970-01-01
  • 2020-08-14
  • 1970-01-01
  • 2023-01-20
  • 2022-11-28
  • 1970-01-01
相关资源
最近更新 更多