【问题标题】:Python passing queue objects to multiprocessing.Pool of processesPython将队列对象传递给multiprocessing.Pool of processes
【发布时间】:2017-11-07 16:00:29
【问题描述】:

问题

我的应用程序中有一个共享的multiprocessing.Pool 对象,它使用两个队列对象(一个用于作业,另一个用于结果)进行了初始化。

如何将任意队列对象发送到作业队列并让进程将结果发送到此备用队列?

job_q.put_nowait((item, alt_q)) # Raises an exception.

这种方法在执行多线程时效果很好,但在执行多处理时却不行。

示例

下面的示例代码演示了我想要实现的目标。我使用两个 multiprocessing.Queue 对象 job_qres_q 初始化池。好吧,事实上,它们是由 multiprocessing.Manager 创建的代理。 run 函数是每个进程的运行循环,它监视作业队列中的项目并将项目简单地添加到结果队列中。 (一个单独的线程正在监视结果队列并打印到标准输出)。

import multiprocessing as mp
import queue
import threading
import time
import os

def run(job_queue, result_queue):
    """ Run-loop for each process.
    """
    print("Starting process {}".format(os.getpid()))
    while True:
        job_q = job_queue
        res_q = result_queue
        try:
            # `item` is just a string
            # `opt_queue` is an optional result queue to use
            item, opt_queue = job_q.get(True, 0.05)
            if opt_queue is not None:
                res_q = opt_queue
            item = item + " Processed"
            res_q.put_nowait(item)
        except queue.Empty:
            continue


def monitor_queue(mp_queue):
    """ The target of a monitoring thread.
    """
    while True:
        try:
            item = mp_queue.get(True, 0.05)
            print("Got `{}`".format(item))
        except queue.Empty:
            continue

if __name__ == '__main__':
    m = mp.Manager()
    job_q = m.Queue()
    res_q = m.Queue()
    alt_q = m.Queue()
    # Monitor `res_q` for items
    threading.Thread(target=monitor_queue, args=(res_q,)).start()
    # Monitor `alt_q` for items
    threading.Thread(target=monitor_queue, args=(alt_q,)).start()
    # `run` is called by each process, share `job_q` and `res_q` with all processes
    pool = mp.Pool(2, run, (job_q, res_q))
    time.sleep(1)
    # Add an item to `job_q` and `None` means send result to `res_q`
    print('Putting first item into the job queue')
    job_q.put_nowait(('#1', None))  # prints... Got `#1`
    time.sleep(1)
    # Add an item to `job_q` and send result to `alt_q`
    print('Putting second item into the job queue and passing alternative result queue')
    job_q.put_nowait(('#2', alt_q))  # TypeError: AutoProxy() got an unexpected keyword argument 'manager_owned'
    pool.close()
    pool.terminate()

这会因错误而退出

Putting second item into the job queue and passing alternative result queue
Traceback (most recent call last):
  File "/Users/daniel/Desktop/pydebug/mp_example.py", line 54, in <module>
    job_q.put_nowait(('#1', alt_q))  # TypeError: AutoProxy() got an unexpected keyword argument 'manager_owned'
  File "<string>", line 2, in put_nowait
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/managers.py", line 772, in _callmethod
    raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError: 
---------------------------------------------------------------------------
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/managers.py", line 228, in serve_client
    request = recv()
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 251, in recv
    return _ForkingPickler.loads(buf.getbuffer())
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/managers.py", line 881, in RebuildProxy
    return func(token, serializer, incref=incref, **kwds)
TypeError: AutoProxy() got an unexpected keyword argument 'manager_owned'
---------------------------------------------------------------------------

【问题讨论】:

    标签: python python-multiprocessing


    【解决方案1】:

    我认为您不能在消息中传递队列,因为它不可序列化。

    【讨论】:

    • 我需要发送共享内存对象,以便两个进程共享同一个对象。我想我也许可以这样做,它适用于线程,但不适用于多处理。谢谢。
    猜你喜欢
    • 2016-07-22
    • 1970-01-01
    • 2018-03-05
    • 2018-11-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-06-21
    相关资源
    最近更新 更多