【问题标题】:Is it possible to manually lock/unlock a Queue?是否可以手动锁定/解锁队列?
【发布时间】:2013-11-05 02:01:22
【问题描述】:

我很好奇是否有办法手动锁定multiprocessing.Queue 对象。

我设置了一个非常标准的生产者/消费者模式,其中我的主线程不断产生一系列值,multiprocessing.Process 工人池正在对产生的值起作用。

这一切都通过一个单独的multiprocessing.Queue()控制。

import time
import multiprocessing


class Reader(multiprocessing.Process): 
    def __init__(self, queue): 
        multiprocessing.Process.__init__(self)
        self.queue = queue 

    def run(self): 
        while True: 
            item = self.queue.get() 
            if isinstance(item, str): 
                break 


if __name__ == '__main__': 

    queue = multiprocessing.Queue()
    reader = Reader(queue)
    reader.start()

    
    start_time = time.time()
    while time.time() - start_time < 10: 
        queue.put(1)
    queue.put('bla bla bla sentinal')
    queue.join() 

我遇到的问题是我的工作池不能像主线程向其中插入值一样快地消耗和处理queue。所以过了一段时间后,Queue 太笨重了,它会弹出一个 MemoryError。

一个明显的解决方案是简单地在生产者中添加一个等待检查,以阻止它向队列中添加更多值。大致如下:

while time.time() - start_time < 10: 
    queue.put(1)
    while queue.qsize() > some_size:
        time.sleep(.1)
queue.put('bla bla bla sentinal')
queue.join() 

但是,由于程序的时髦特性,我想将队列中的所有内容转储到文件中以供以后处理。但!由于无法临时锁定队列,工作人员无法消耗其中的所有内容,因为生产者不断用垃圾填充它 - 无论如何从概念上讲。经过无数次测试后,似乎在某些时候其中一个锁获胜(但通常是添加到队列中的那个)。

编辑:另外,我意识到可以简单地停止生产者并从该线程中消费它......但这让我心中的单一责任人感到难过,因为生产者是生产者,而不是消费者.

编辑:

看了Queue的源码后,想到了这个:

def dump_queue(q):
    q._rlock.acquire()
    try:
        res = []
        while not q.empty():
            res.append(q._recv())
            q._sem.release()
        return res
    finally:
        q._rlock.release()    

但是,我太害怕使用它了!我不知道这是否“正确”。我没有足够的把握知道这是否会在不炸毁任何Queues 内部的情况下成立。

有人知道这会不会坏吗? :)

【问题讨论】:

  • 你可以只使用multiprocessing.Queue的maxsize参数来限制队列中一次可以有多少项目?
  • @AustinPhillips 不幸的是没有。它不会完全解决问题。因为生产者正在从流中读取以生成其值,所以我想尽可能长时间地保持它“向上”。如果消费者在处理过程中落后,我宁愿快速将队列转储到磁盘并稍后处理它,而不是阻止生产者读取它的输入源。
  • 也许你真的不需要像这样锁定队列。只需添加另一个消费者,其工作就是将队列保持在合理的水平。它可以轮询queue.qsize(),如果它变得太大,则使用队列中的项目并存储到文件中。
  • 没有记录的方法来锁定mp.Queue - 但即使有,它有什么好处?当它被锁定时,没有进程可以从中取出项目。

标签: python multiprocessing


【解决方案1】:

鉴于 cmets 中所说的内容,Queue 对您的问题来说只是一个错误的数据结构 - 但可能部分是一个可用的解决方案。

听起来您只有一个制作人。创建一个新的本地生产者(跨进程共享)类,实现您真正需要的语义。例如,

class FlushingQueue:
    def __init__(self, mpqueue, path_to_spill_file, maxsize=1000, dumpsize=1000000):
        from collections import deque
        self.q = mpqueue  # a shared `multiprocessing.Queue`
        self.dump_path = path_to_spill_file
        self.maxsize = maxsize
        self.dumpsize = dumpsize
        self.d = deque()  # buffer for overflowing values

    def put(self, item):
        if self.q.qsize() < self.maxsize:
            self.q.put(item)
            # in case consumers have made real progress
            while self.d and self.q.qsize() < self.maxsize:
                self.q.put(self.d.popleft())
        else:
            self.d.append(item)
            if len(self.d) >= self.dumpsize:
                self.dump()

    def dump(self):
        # code to flush self.d to the spill file; no
        # need to look at self.q at all

我打赌你可以做到这一点:-)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-30
    • 2017-04-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-08-10
    相关资源
    最近更新 更多