【问题标题】:Implementing multiple producer and multiple workers results in deadlock实现多个生产者和多个工作者会导致死锁
【发布时间】:2021-11-02 13:01:08
【问题描述】:

我一直在尝试使用 python 中的多处理来实现多生产者和多消费者模型。 生产者从网络上抓取数据,消费者处理数据。 起初我只是实现了两个具有特定功能的函数生产者和消费者,并使用队列在它们之间进行通信,但不知道如何处理完成事件。 然后我使用信号量实现了模型 -

def producer(RESP_q, URL_q, SEM):
    with SEM:
        while True:
            url = URL_q.get()

            if url == "END": 
                break

            RESP = produce_txns(url)
            RESP_q.put(RESP)

def consumer(RESP_q, SEM, NP):
    while SEM.get_value() < NP or not RESP_q.empty():
        resp = RESP_q.get()

        for txn in resp:
            _txn = E_Transaction(txn)
            print(_txn)
 
        RESP_q.task_done()


class Manager:
    def __init__(self):
        self.URL_q = Queue()
        self.RESP_q = JoinableQueue()
        self.max_processes = cpu_count()
        self.SEM = Semaphore(self.max_processes // 2)

    def start(self):
        self.worker = []

        for i in range(0, self.max_processes, 2):
            self.worker.append(Process(target=producer, args=(self.RESP_q, self.URL_q, self.SEM)))
            self.worker.append(Process(target=consumer, args=(self.RESP_q, self.SEM, self.max_processes // 2)))
        
        url_server(self.URL_q, self.max_processes // 2)
        #Consider URL_q holds -> [*data, *["END"]*(self.max_processes // 2)]

        for worker in self.worker:
            worker.start()

        self.stop()

    def stop(self):
        for worker in self.worker:
            worker.join()

        self.RESP_q.join()
        self.RESP_q.close()
        self.URL_q.close()
        

Manager().start()

当 (In Consumer) RESP_q 为空且 SEM 接近 max_process 并且当解释器满足 while 条件时,此实现失败,SEM 将具有与 max_process 相同的值,并且不会留下任何生产者,并且程序在 get 方法处被阻塞. 我无法解决这个问题。

编辑 1。

@Louis Lac 的实现也是正确的。我更正了我的代码以使用 try-except 块消除死锁。

def consumer(RESP_q, SEM, NP):
    while SEM.get_value() < NP or not RESP_q.empty():
        try:
           resp = RESP_q.get(timeout=0.5)
        except Exception:
           continue

【问题讨论】:

  • 为什么要使用信号量?将while True 无限循环替换为while condition 循环,其中condition 最初为True,并在处理完成后更改为False
  • 有多个消费者和生产者,它与单一生产者和消费者模型略有不同,因为并非所有消费者都能知道所有生产者何时完成。
  • 您可以为生产者创建一个“注册”活动。当生产者注册时,共享计数器会增加。当生产者完成它的最后一个动作是注销,这会减少共享计数器。当共享计数器从非 0 变为 0 时,消费者就可以完成他们的工作。

标签: python python-multiprocessing producer-consumer multiple-processes


【解决方案1】:

这里是一个多消费者多生产者实现的例子。您可以在实例化进程时使用daemon 标志,以便在程序退出时自动关闭它们。然后,您可以使用 JoinableQueue 并加入它们(而不是加入进程),以便在没有剩余要处理的项目时退出程序。

您应该使用if __main__ == "__main__ 来启动程序,而不会导致该程序的递归执行。

from multiprocessing import Process, JoinableQueue
from time import sleep


def consumer(in_queue: JoinableQueue, out_queue: JoinableQueue):
    while True:
        item = in_queue.get()
        sleep(0.5)
        s = str(item)
        out_queue.put(s)
        in_queue.task_done()

def producer(in_queue: JoinableQueue):
    while True:
        item = in_queue.get()
        sleep(0.5)
        n = int(item)
        print(n)
        in_queue.task_done()

if __name__ == "__main__":
    number_queue = JoinableQueue()
    str_queue = JoinableQueue()

    for _ in range(4):
        Process(target=consumer, args=(number_queue, str_queue), daemon=True).start()
        Process(target=producer, args=(str_queue,), daemon=True).start()

    for i in range(100):
        number_queue.put(i)

    number_queue.join()
    str_queue.join()

【讨论】:

  • 这个实现真的很聪明。如果我不使用 if name == "main" 怎么会有递归执行?
  • 查看SO post 以获得解释(实际上答案似乎对 Unix 系统也有效)。 Python中的多处理涉及实例化多个Python解释器并多次执行主文件,这会导致递归。
  • 如果你大量使用多个生产者和消费者,可以查看multiprocessing.Pool
猜你喜欢
  • 1970-01-01
  • 2012-07-27
  • 1970-01-01
  • 2021-11-14
  • 1970-01-01
  • 2013-03-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多