【发布时间】:2017-03-08 13:42:14
【问题描述】:
我正在尝试了解 Python 中的多处理,但目前正在努力解决以下问题:
从工作人员池开始,我想将生成器函数中的对象提供给队列,然后由工作人员使用。这很好用,但是我现在想扩展我的程序以允许工作人员将工作添加到队列中。然而,这是我遇到问题的部分,因为我在第一个循环中添加的工作紧跟在第二个循环中添加的停止代码(参见示例代码)。这意味着任何工人添加的任何额外工作将永远不会被执行......
我认为唯一需要的方法是检查队列是否为空以及所有工作人员都没有做任何事情,然后继续执行最后一个停止工作人员的 for 循环。但是我不知道如何检查工人的状态来做到这一点。
显示示例的最少代码:
import multiprocessing, time, random
def f(queue):
worker_name = multiprocessing.current_process().name
print "Started: {}".format(worker_name)
while True:
value = queue.get()
if value is None:
break
print "{} is processing '{}'".format(worker_name, value)
# compute(value)
time.sleep(1)
# Worker may add additional work to queue
if random.random() > 0.7:
queue.put("Extra work!")
print "Stopping: {}".format(worker_name)
n_workers = 4
queue = multiprocessing.Queue()
pool = multiprocessing.Pool(n_workers, f, (queue,))
# Feed large objects from generator
for i in xrange(20):
queue.put(i)
# All extra work is skipped
# Terminate workers after finishing work
for __ in xrange(n_workers):
queue.put(None)
pool.close()
pool.join()
print "Finished!"
print queue.get() # Will yield 'Extra Work!' should be empty
【问题讨论】:
标签: python python-multiprocessing