【问题标题】:python multiprocessing shared queue re-orderingpython多处理共享队列重新排序
【发布时间】:2013-10-10 03:31:19
【问题描述】:

我有一个服务器和几个客户端。它们都共享一个任务和结果 multiprocessing.Queue。但是,每当客户端完成任务并将结果放入结果队列时,我希望服务器查看结果,并在此基础上重新排序任务队列。

这当然意味着从任务队列中弹出所有内容并重新添加。在这个重新排序过程中,我希望客户端阻止接触任务队列。我的问题是如何让服务器识别何时将任务添加到结果队列并通过锁定任务队列并在保护队列的同时重新排序来做出反应。不变的是,在客户端获得新任务之前,服务器必须在返回每个结果后重新排序。

我想一个简单(但错误)的方法是让 multiprocessing.Value 充当布尔值,每当添加结果时,客户端都会将其翻转为 True,这意味着已添加结果。服务器可以轮询以获取此值,但最终它可能会错过另一个在轮询之间进入并添加另一个结果的客户端。

任何想法表示赞赏。

** 'multithreading' 标签只是因为它的思想与线程非常相似,我认为这里的进程/线程区别并不重要。

【问题讨论】:

  • 我不确定您遇到的问题。问题是如何保护队列或如何在新结果到达时通知服务器?服务器在剩下的时间里需要做什么?我认为它不能仅仅阻塞在 result_queue.get() 中。
  • 服务器是监听客户端连接的地方,也是存储内存所在的地方
  • 一个进程(可能是服务器)可以监听结果队列,但它必须做出反应并在成功时立即尝试锁定任务队列,这与轮询情况相同,有效

标签: python multithreading client-server queue multiprocessing


【解决方案1】:

让我们尝试一些代码 - 有一些进展总比没有好;-) 部分问题是确保如果结果队列中有任何内容,则不会从任务队列中获取任何内容,对吗?所以队列是紧密相连的。这种方法将两个队列都置于锁的保护之下,并使用条件来避免任何轮询:

设置,在服务器中完成。 taskQresultQtaskCondresultCond 必须传递给客户端进程(lock 不需要显式传递 - 它包含在条件中):

import multiprocessing as mp
taskQ = mp.Queue()
resultQ = mp.Queue()
lock = mp.Lock()
# both conditions share lock
taskCond = mp.Condition(lock)
resultCond = mp.Condition(lock)

客户端获得任务;所有客户端都使用此功能。请注意,只要结果队列中有内容,任务就不会被消耗:

def get_task():
    taskCond.acquire()
    while taskQ.qsize() == 0 or resultQ.qsize():
        taskCond.wait()
    # resultQ is empty and taskQ has something
    task = taskQ.get()
    taskCond.release()
    return task

客户有结果:

with resultCond:
    resultQ.put(result)
    # only the server waits on resultCond
    resultCond.notify()

服务器循环:

resultCond.acquire()
while True:
    while resultQ.qsize() == 0:
        resultCond.wait()
    # operations on both queues in all clients are blocked now
    # ... drain resultQ, reorder taskQ ...
    taskCond.notify_all()

注意事项:

  1. qsize() 通常是概率性的,但因为所有队列操作都是在持有锁时完成的,所以在这种情况下它是可靠的。

  2. 其实因为这里所有的队列操作都被我们自己的锁保护了,所以真的没必要用mp.Queues。例如,mp.Manager().list() 也可以工作(任何共享结构)。当您重新安排任务时,也许列表会更容易处理?

  3. 我不太喜欢的部分:当服务器执行taskCond.notify_all() 时,一些客户端可能正在等待获取新任务,而其他客户端可能正在等待返回新结果。它们可以按任何顺序运行。一旦任何等待返回结果的客户端有机会,所有等待获取任务的客户端都会阻塞,但在此之前任务将被消耗。当然,这里的“问题”是我们不知道在实际将某些内容添加到结果队列之前等待新结果。

对于最后一个,也许将“客户端有结果”代码更改为:

resultQ.put(result)
with resultCond:
    resultCond.notify()

会更好。不确定。这确实使推理变得更加困难,因为在我们的锁的保护下完成所有队列操作不再是真的。

【讨论】:

  • 我想出了一些非常相似的东西。您提到的关键见解是,由于不变量,队列在某种意义上是捆绑在一起的。因此一旦锁定,它们就不需要是特殊的线程安全数据结构。我最终让服务器侦听器线程(在客户端连接后为客户端侦听套接字的线程)实际上进行了重新排序。这是最好的,因为这些侦听器实际上会响应已完成的任务。它们的阻塞重排序和结果调度真正结合在一起。
  • 虽然我承认我不太了解 multiprocessing.Condition 类的用法 - 为什么不使用常规锁?
  • 酷!条件有一个学习曲线,但是一旦学习了条件,就可以极大地简化编写正确的并行安全代码。这就是为什么,例如,Python 的 threading.py 在条件之上构建事件、障碍和信号量。一个关键的好处是 - 如上所述 - 条件中的基础锁定永远不会长时间持有。代码要么很快释放锁,要么很快执行.wait()(释放锁,并在.notify() 结束等待时(重新)获取锁)。没有投票,没有猜测。但是条件是建立在锁上的,所以 - 不 - 它们并不是真正重要的。
猜你喜欢
  • 1970-01-01
  • 2014-01-11
  • 2018-06-25
  • 2013-09-06
  • 2020-06-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-08-31
相关资源
最近更新 更多