【问题标题】:Single Producer Multiple Consumer单生产者多消费者
【发布时间】:2015-04-05 14:57:28
【问题描述】:

我希望在执行多线程编程的同时,在 Python 中拥有一个单一的生产者、多个消费者的架构。我希望有这样的操作:

  1. 生产者产生数据
  2. 消费者 1 ..N(N 是预先确定的)等待数据到达(阻塞),然后以不同的方式处理 SAME 数据。

所以我需要所有消费者从生产者那里获取相同的数据。

当我使用 Queue 来执行此操作时,我意识到除了第一个使用者之外的所有人都会对我拥有的实现感到饥饿。

一种可能的解决方案是为每个消费者线程设置一个唯一的队列,其中生产者将相同的数据推送到多个队列中。有没有更好的方法来做到这一点?

from threading import Thread
import time
import random
from Queue import Queue

my_queue = Queue(0)

def Producer():
    global my_queue
    my_list = []
    for each in range (50):
        my_list.append(each)
    my_queue.put(my_list)

def Consumer1():
    print "Consumer1"
    global my_queue
    print my_queue.get()
    my_queue.task_done()

def Consumer2():
    print "Consumer2"
    global my_queue
    print my_queue.get()
    my_queue.task_done()


P = Thread(name = "Producer", target = Producer)

C1 = Thread(name = "Consumer1", target = Consumer1)

C2 = Thread(name = "Consumer2", target = Consumer2)


P.start()

C1.start()

C2.start()

在上面的示例中,C2 被无限期阻塞,因为 C1 消耗 P1 产生的数据。我更希望 C1 和 C2 都能够访问 P1 生成的相同数据。

感谢任何代码/指针!

【问题讨论】:

  • 既然你有 N 个消费者,我会使用线程/进程池

标签: python multithreading queue


【解决方案1】:

一个单一生产者和五个消费者的例子,经过验证。

from multiprocessing import Process, JoinableQueue
import time
import os

q = JoinableQueue()

def producer():
    for item in range(30):
        time.sleep(2)
        q.put(item)
    pid = os.getpid()
    print(f'producer {pid} done')


def worker():
    while True:
        item = q.get()
        pid = os.getpid()
        print(f'pid {pid} Working on {item}')
        print(f'pid {pid} Finished {item}')
        q.task_done()

for i in range(5):
    p = Process(target=worker, daemon=True).start()

producers = []
# it is easy to extend it to multi producers.
for i in range(1):
    p = Process(target=producer)
    producers.append(p)
    p.start()

# make sure producers done
for p in producers:
    p.join()

# block until all workers are done
q.join()
print('All work completed')

解释:

  1. 在此示例中,一个生产者和五个消费者。
  2. JoinableQueue 用于确保存储在队列中的所有元素都将被处理。 'task_done' 用于通知工作人员已完成。 'q.join()' 将等待所有标记为完成的元素。
  3. 使用 #2,无需等待每个工人加入。
  4. 但重要的是加入等待生产者将元素存储到队列中。否则程序立即退出。

【讨论】:

    【解决方案2】:

    您的制作人只创建一项工作:

    my_queue.put(my_list)
    

    例如,把 my_list 放两次,两个消费者都工作:

    def Producer():
        global my_queue
        my_list = []
        for each in range (50):
            my_list.append(each)
        my_queue.put(my_list)
        my_queue.put(my_list)
    

    所以这样你就可以将两个作业放到同一个列表的队列中。

    但是我必须警告你:在没有线程同步的情况下修改不同线程中的相同数据通常是个坏主意。

    无论如何,一个队列的方法不适合你,因为一个队列应该由具有相同算法的线程处理。

    因此,我建议您继续为每个消费者设置唯一的队列,因为其他解决方案并不那么简单。

    【讨论】:

    • 什么会阻止同一个消费者获取队列数据的两个副本?
    • @martineau 感谢您指出这一点。现实中什么都没有。更新了答案。
    • 嗯,即使您更新了答案,我仍然不明白您如何处理@martineau 问题?你的意思是有一些重要的解决方案吗?你能提到他们吗?
    【解决方案3】:

    那么每个线程队列怎么样?

    作为启动每个消费者的一部分,您还将创建另一个队列,并将其添加到“所有线程队列”列表中。然后启动生产者,将所有队列的列表传递给它,然后他可以将数据推送到所有队列中。

    【讨论】:

    • OP 说“一种可能的解决方案是为每个消费者线程设置一个唯一的队列,其中相同的数据由生产者推送到多个队列中。有没有更好的方法来做到这一点?” -- 所以似乎在寻求不同的方法。
    • @martineau Lol,完全错过了。
    猜你喜欢
    • 2017-02-24
    • 2011-03-12
    • 2019-05-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多