【发布时间】:2017-11-07 16:00:29
【问题描述】:
问题
我的应用程序中有一个共享的multiprocessing.Pool 对象,它使用两个队列对象(一个用于作业,另一个用于结果)进行了初始化。
如何将任意队列对象发送到作业队列并让进程将结果发送到此备用队列?
job_q.put_nowait((item, alt_q)) # Raises an exception.
这种方法在执行多线程时效果很好,但在执行多处理时却不行。
示例
下面的示例代码演示了我想要实现的目标。我使用两个 multiprocessing.Queue 对象 job_q 和 res_q 初始化池。好吧,事实上,它们是由 multiprocessing.Manager 创建的代理。 run 函数是每个进程的运行循环,它监视作业队列中的项目并将项目简单地添加到结果队列中。 (一个单独的线程正在监视结果队列并打印到标准输出)。
import multiprocessing as mp
import queue
import threading
import time
import os
def run(job_queue, result_queue):
""" Run-loop for each process.
"""
print("Starting process {}".format(os.getpid()))
while True:
job_q = job_queue
res_q = result_queue
try:
# `item` is just a string
# `opt_queue` is an optional result queue to use
item, opt_queue = job_q.get(True, 0.05)
if opt_queue is not None:
res_q = opt_queue
item = item + " Processed"
res_q.put_nowait(item)
except queue.Empty:
continue
def monitor_queue(mp_queue):
""" The target of a monitoring thread.
"""
while True:
try:
item = mp_queue.get(True, 0.05)
print("Got `{}`".format(item))
except queue.Empty:
continue
if __name__ == '__main__':
m = mp.Manager()
job_q = m.Queue()
res_q = m.Queue()
alt_q = m.Queue()
# Monitor `res_q` for items
threading.Thread(target=monitor_queue, args=(res_q,)).start()
# Monitor `alt_q` for items
threading.Thread(target=monitor_queue, args=(alt_q,)).start()
# `run` is called by each process, share `job_q` and `res_q` with all processes
pool = mp.Pool(2, run, (job_q, res_q))
time.sleep(1)
# Add an item to `job_q` and `None` means send result to `res_q`
print('Putting first item into the job queue')
job_q.put_nowait(('#1', None)) # prints... Got `#1`
time.sleep(1)
# Add an item to `job_q` and send result to `alt_q`
print('Putting second item into the job queue and passing alternative result queue')
job_q.put_nowait(('#2', alt_q)) # TypeError: AutoProxy() got an unexpected keyword argument 'manager_owned'
pool.close()
pool.terminate()
这会因错误而退出
Putting second item into the job queue and passing alternative result queue
Traceback (most recent call last):
File "/Users/daniel/Desktop/pydebug/mp_example.py", line 54, in <module>
job_q.put_nowait(('#1', alt_q)) # TypeError: AutoProxy() got an unexpected keyword argument 'manager_owned'
File "<string>", line 2, in put_nowait
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/managers.py", line 772, in _callmethod
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/managers.py", line 228, in serve_client
request = recv()
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 251, in recv
return _ForkingPickler.loads(buf.getbuffer())
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/managers.py", line 881, in RebuildProxy
return func(token, serializer, incref=incref, **kwds)
TypeError: AutoProxy() got an unexpected keyword argument 'manager_owned'
---------------------------------------------------------------------------
【问题讨论】:
标签: python python-multiprocessing