【问题标题】:Deadlock in python multiprocessing queuepython多处理队列中的死锁
【发布时间】:2018-08-08 02:02:17
【问题描述】:

我正在使用多处理库中的队列在进程之间共享数据。

我有 2 个队列,都限制为 10 个对象,第一个队列有一个进程将对象“放入”其中,许多进程从中“获取”。

第二个队列有许多进程将对象“放入”其中,而只有一个进程从中“获取”。

系统在一段时间内完美运行,然后开始出现奇怪的行为:只有将对象“放入”第一个队列的进程继续运行,而 从第一个队列读取的进程显然不再运行/工作(即使进程还活着)。 这里似乎有一个死锁,但我不确定,这是我的代码:

更新

import multiprocessing
import logging
from multiprocessing import Process

logger = logging.get_logger(__name__)

# Processes 2, 3 ,4:

class Processes_234(Process):
    def __init__(self, message_queue_1, message_queue_2):
        Process.__init__(self)
        self.message_queue_1 = message_queue_1
        self.message_queue_2 = message_queue_2

    def run(self):
        while True:
            try:
                # get from queue
                el1, el2, el3 = self.message_queue_1.get()
                logger.debug('Processes234: get from queue')
            except Exception as exp:
                logger.debug("message_queue_1: queue empty, Exception message: " + str(exp))

            # do some stuff with el1, el2, el3...

            try:
                # put into second queue
                self.message_queue_2.put_nowait((el1, el2, el3))
                logger.debug('Processes234: put into queue')
            except Exception as excpt:
                logger.debug(excpt)
                logger.debug("message_queue_2: queue is full")
                # the queue is full so replace the old element with the new one
                try:
                    self.message_queue_2.get_nowait()
                    self.message_queue_2.put_nowait((el1, el2, el3))
                    # in case other process already fill the queue - ignore
                except:
                    pass


# process 5:
class Process5(Process):
    def __init__(self, message_queue_2):
        Process.__init__(self)
        self.message_queue_2 = message_queue_2

    def run(self):
        while True:
            try:
                # get from queue
                el1, el2, el = self.message_queue_2.get()
                print('Process5: get from queue')

            except Exception as exp:
                print("message_queue_2: queue empty, Exception message: " + str(exp))


def start_process_1():
    # init queues
    message_queue_1 = multiprocessing.Queue(maxsize=10)
    message_queue_2 = multiprocessing.Queue(maxsize=10)

    processes_234 = [Processes_234(message_queue_1, message_queue_2)
                     for _ in range(3)]

    for proc in processes_234:
        proc.start()

    process5 = Process5(message_queue_2)
    process5.start()
    counter = 1

    while True:
        el1 = counter + 1
        el2 = counter + counter
        el3 = "some string " * ((counter ** 2) % 60000)
        counter += 1
        # start passing data
        try:

            # put into queue
            message_queue_1.put_nowait((el1, el2, el3))
            logger.debug('Process1: put into queue')
        except Exception as excpt:
            logger.debug(excpt)
            logger.debug("message_queue_1: queue is full")
            # the queue is full so replace the old element with the new one
            try:
                message_queue_1.get_nowait()
                message_queue_1.put_nowait((el1, el2, el3))
                # in case other process already fill the queue - ignore
            except:
                pass


if __name__ == '__main__':
    start_process_1()

有人知道我的问题是什么吗?

我正在使用 python 3.6.5

【问题讨论】:

  • 看不到您发布的内容有任何直接问题,但话又说回来,很难尝试重现您的问题,因为我们必须对代码的执行方式做出假设(您的 @987654323 @objects 和你的 __main__ 看起来像)。请创建一个MCVE,并解释您是如何确定您的Process_234 对象停止工作的。
  • 谢谢,我解决了,并根据您的要求将代码更新给重现此案例的人。

标签: python python-3.x multiprocessing message-queue deadlock


【解决方案1】:

最后我能够解决问题,是记录器! 根据日志记录库,记录器是线程安全的,但不是多进程安全的。

我更改了代码,使每个进程都有自己的记录器,它解决了这个问题。

【讨论】:

  • 很好,感谢您更新您的问题。这样,它实际上可能会帮助其他有类似问题的人
  • 如果我没有找到这个问题,我不知道我需要多长时间才能解决这个问题,谢谢!
猜你喜欢
  • 1970-01-01
  • 2015-10-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-01-11
  • 2013-11-04
  • 1970-01-01
相关资源
最近更新 更多