【问题标题】:parallelize for loop and merge pandas dataframes并行化 for 循环并合并 pandas 数据帧
【发布时间】:2022-01-01 14:42:17
【问题描述】:

我的脚本如下

import pandas as pd

df = pd.DataFrame({'key': ['K0', 'K1', 'K2', 'K3'],
                      'A': ['A0', 'A1', 'A2', 'A3']})

def make_df(year):
    df = pd.DataFrame({'key': ['K0', 'K1', 'K2', 'K3'], str(year): [str(year), str(year+1), str(year+2), str(year+3)]})
    return df

for year in range(2020, 2015, -1):
        df = pd.merge(df, make_df(year), on=['key'], how='left')

最终的 df 将是..

  key   A  2020  2019  2018  2017  2016
0  K0  A0  2020  2019  2018  2017  2016
1  K1  A1  2021  2020  2019  2018  2017
2  K2  A2  2022  2021  2020  2019  2018
3  K3  A3  2023  2022  2021  2020  2019

我的实际make_new_df(year) 要复杂得多,而且需要太多时间。

如何并行化 for 循环 for year in range(2020, 2015, -1): 并缩短处理时间?

【问题讨论】:

  • 您可以尝试使用标准模块threadingmultiprocessingrayjoblibpyspark等外部模块,这些模块可能对DataFrame有一些功能。甚至可能还有一个我不记得名字的模块 - pandas-??? - 它可以将多处理添加到 DataFrame
  • 感谢您的评论。我尝试了一些模块,如 multiprocessing 或 dask,但未能使用它们。我找不到任何解释使用它们的详细方法的文件。我所发现的只是关于一个数据帧中的多处理,而不是关于将多个数据帧合并为一个。你有什么推荐的文件吗?
  • 您可以在分离的线程/进程中生成新数据,但稍后您必须将它们加入主进程。
  • 其他想法:在Google Colab服务器上发送数据,运行代码并下载结果可能会更快:)

标签: python pandas dataframe parallel-processing


【解决方案1】:
编辑:使用multiprocessing 而不是threading

阅读您的 cmets 后,您似乎想在不同的进程(并行)中运行您的函数:

import multiprocessing
import pandas as pd

df = pd.DataFrame({'key': ['K0', 'K1', 'K2', 'K3'],
                      'A': ['A0', 'A1', 'A2', 'A3']})
year_start = 2020
year_stop = 2015
year_range = range(year_start, year_stop, -1)

def make_df(year):
    df = pd.DataFrame({str(year): [str(year), str(year+1), str(year+2), str(year+3)]})
    return df

pool = multiprocessing.Pool(year_start - year_stop)
df_list = pool.map(func=make_df, iterable=year_range)
pool.close()
pool.join()

df = df.join(df_list)
print(df)

【讨论】:

  • 我不知道您的数据实际上是什么样的。您仍然可以将mergedf_list 循环中的密钥一起使用。在您的示例中似乎没有必要,因为您总是在同一列上合并,所以我最初用 join 替换它(并从函数中删除了键列)
  • 非常感谢。您的脚本适用于有关加入的一些小修改。但是,我不知道为什么,它加倍了时间。当然,它在运行时会使用更多的 CPU(%)。我的代码中似乎有一些特殊之处。
  • 一定是因为Python GIL。我已更新我的答案以使用multiprocessing。如果您发现任何改进,请告诉我!
  • 谢谢特兰比。执行“pool=multiprocessing.Pool(year_start - year_stop)”时出错。我一直在努力,但找不到解决方案。有什么我想念的吗?
  • 你在我的代码中定义了year_startyear_stop 吗?第一个必须更大,因为它定义了池的大小。您是否导入了 multiprocessing 模块?
猜你喜欢
  • 2021-09-26
  • 1970-01-01
  • 1970-01-01
  • 2018-08-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-04-09
  • 2011-08-23
相关资源
最近更新 更多