【发布时间】:2020-05-09 16:17:05
【问题描述】:
在 Python 3.6 中,我并行运行多个进程,其中每个进程 ping 一个 URL 并返回一个 Pandas 数据帧。我想继续运行(2+)个进程,我创建了一个最小的代表性示例,如下所示。
我的问题是:
1) 我的理解是,由于我有不同的功能,我不能使用Pool.map_async() 及其变体。那正确吗?我见过的唯一例子是重复相同的功能,比如this answer。
2) 使此设置永久运行的最佳做法是什么?在下面的代码中,我使用了 while 循环,我怀疑它不适合此目的。
3) 我使用Process 和Manager 的方式是否最佳?我使用multiprocessing.Manager.dict() 作为共享字典从进程中返回结果。我在this answer 的评论中看到,在这里使用Queue 是有意义的,但是Queue 对象没有`.dict()' 方法。所以,我不确定这将如何工作。
如果有任何改进和建议使用示例代码,我将不胜感激。
import numpy as np
import pandas as pd
import multiprocessing
import time
def worker1(name, t , seed, return_dict):
'''worker function'''
print(str(name) + 'is here.')
time.sleep(t)
np.random.seed(seed)
df= pd.DataFrame(np.random.randint(0,1000,8).reshape(2,4), columns=list('ABCD'))
return_dict[name] = [df.columns.tolist()] + df.values.tolist()
def worker2(name, t, seed, return_dict):
'''worker function'''
print(str(name) + 'is here.')
np.random.seed(seed)
time.sleep(t)
df = pd.DataFrame(np.random.randint(0, 1000, 12).reshape(3, 4), columns=list('ABCD'))
return_dict[name] = [df.columns.tolist()] + df.values.tolist()
if __name__ == '__main__':
t=1
while True:
start_time = time.time()
manager = multiprocessing.Manager()
parallel_dict = manager.dict()
seed=np.random.randint(0,1000,1) # send seed to worker to return a diff df
jobs = []
p1 = multiprocessing.Process(target=worker1, args=('name1', t, seed, parallel_dict))
p2 = multiprocessing.Process(target=worker2, args=('name2', t, seed+1, parallel_dict))
jobs.append(p1)
jobs.append(p2)
p1.start()
p2.start()
for proc in jobs:
proc.join()
parallel_end_time = time.time() - start_time
#print(parallel_dict)
df1= pd.DataFrame(parallel_dict['name1'][1:],columns=parallel_dict['name1'][0])
df2 = pd.DataFrame(parallel_dict['name2'][1:], columns=parallel_dict['name2'][0])
merged_df = pd.concat([df1,df2], axis=0)
print(merged_df)
【问题讨论】:
标签: python-3.x multiprocessing queue scheduled-tasks