【发布时间】:2020-02-12 14:05:49
【问题描述】:
我有一个脚本,它使用多处理来打开约 200k .csv 文件并对其执行计算。这是工作流程:
1) 考虑一个包含约 200k .csv 文件的文件夹。每个 .csv 文件都包含以下内容:
.csv 文件示例:
0, 1
2, 3
4, 5
...
~500 rows
2) 脚本将所有 .csv 文件的列表保存在 list()
3) 由于我有 8 个处理器可用,该脚本将包含 ~200k .csv 文件的列表分成 8 个列表。
4) 脚本调用do_something_with_csv() 8 次并行计算。
在线性模式下,执行大约需要 4 分钟。
并行和串行,如果我第一次执行脚本,需要更长的时间。如果我执行第二次、第三次等时间,大约需要 1 分钟。似乎 python 正在缓存某种类型的 IO 操作?看起来是因为我有一个进度条,例如,如果我执行到进度条为 5k/200k 并终止程序,下一次执行将非常快地经过前 5k 次运行,然后变慢。
Python 版本:3.6.1
伪 Python 代码:
def multiproc_dispatch():
lst_of_all_csv_files = get_list_of_files('/path_to_csv_files')
divided_lst_of_all_csv_files = split_list_chunks(lst_of_all_csv_files, 8)
manager = Manager()
shared_dict = manager.dict()
jobs = []
for lst_of_all_csv_files in divided_lst_of_all_csv_files:
p = Process(target=do_something_with_csv, args=(shared_dict, lst_of_all_csv_files))
jobs.append(p)
p.start()
# Wait for the worker to finish
for job in jobs:
job.join()
def read_csv_file(csv_file):
lst_a = []
lst_b = []
with open(csv_file, 'r') as f_read:
csv_reader = csv.reader(f_read, delimiter = ',')
for row in csv_reader:
lst_a.append(float(row[0]))
lst_b.append(float(row[1]))
return lst_a, lst_b
def do_something_with_csv(shared_dict, lst_of_all_csv_files):
temp_dict = lambda: defaultdict(self.mydict)()
for csv_file in lst_of_all_csv_files:
lst_a, lst_b = read_csv_file(csv_file)
temp_dict[csv_file] = (lst_a, lst_b)
shared_dict.update(temp_dict)
if __name__ == '__main__':
multiproc_dispatch()
【问题讨论】:
-
两个连续的线性运行是否都在 4 分钟左右?
-
我刚刚进行了比较,实际上在第二次运行线性模式后不到 4 分钟。你知道为什么吗?我终止了脚本,它不在同一次执行的循环中。
-
在下面查看我的答案
标签: python python-3.x python-multiprocessing