【发布时间】:2020-12-18 16:27:20
【问题描述】:
我有一个应用程序,它可以读取 50 个大型 csvs 文件,每个文件大约 400MB。现在我正在阅读这些以创建一个数据框,并最终将所有这些连接成一个单独的数据框。我想并行执行此操作以加快整个过程。所以我下面的代码看起来像这样:
import numpy as np
import pandas as pd
from multiprocessing.pool import ThreadPool
from time import time
Class dataProvider:
def __init__(self):
self.df=pd.DataFrame()
self.pool = ThreadPool(processes=40)
self.df_abc=pd.DataFrame()
self.df_xyz=pd.DataFrame()
self.start=time()
def get_csv_data(self,filename):
return pd.read_csv(filename)
def get_all_csv_data(self,filename):
self.start=time()
df_1 = self.pool.apply_sync(self.get_csv_data,('1.csv',), callback=concatDf)
df_2 = self.pool.apply_sync(self.get_csv_data,('2.csv',), callback=concatDf)
total_time=time()-self.start
def concatDf(self):
self.df_abc=pd.concat([df_1,df_2])
self.df_xyz=self.df_abc.iloc[:,1:]
return self.df_xyz
我看到以下代码问题:
- 如果我的 apply_sync 调用调用了相同的回调,那么我怎么知道当前回调是由上面 df_1 行或 df_2 中的哪个调用调用的? 2)我想连接不同apply_sync的输出,在concatDf回调函数中怎么做?
- 我如何知道所有 apply_sync 调用的回调已完成,以便我可以返回所有 50 个 csv 的串联数据帧?
- 有没有更好更有效的方法来做到这一点?
谢谢
【问题讨论】:
-
你受内存限制吗?
-
@mrzo 我有足够的 RAM 超过 300GB,想法是并行读取和连接这些文件,因为每个文件可能需要 30 秒。所以理想情况下我不想要 30*50 然后开始连接过程。谢谢
-
一次连接数据帧比像您尝试那样迭代地连接它们更有效。那你为什么要这样做?你试过我的答案吗?
-
@mrzo 是的,将通过您的回答并更新您。感谢您的帮助
标签: python-3.x pandas multithreading threadpool