【问题标题】:Starting multiple thread processes to process a queue启动多个线程进程来处理一个队列
【发布时间】:2020-12-05 02:32:39
【问题描述】:

使用下面的代码,我开始线程化进程,write_process 写入队列,read_process 从队列中读取:

import time
from multiprocessing import Process, Queue, Pool
class QueueFun():

    def writing_queue(self, work_tasks):
        while True:
            print("Writing to queue")
            work_tasks.put(1)
            time.sleep(1)

    def read_queue(self, work_tasks):
        while True:
            print('Reading from queue')
            work_tasks.get()
            time.sleep(2)


if __name__ == '__main__':
    q = QueueFun()
    work_tasks = Queue()

    write_process = Process(target=q.writing_queue,
                                     args=(work_tasks,))
    write_process.start()

    read_process = Process(target=q.read_queue,
                                     args=(work_tasks,))
    read_process.start()

    write_process.join()
    read_process.join()

运行上面的代码打印:

Writing to queue
Reading from queue
Writing to queue
Reading from queue
Writing to queue
Writing to queue
Reading from queue
Writing to queue

如何启动N个进程从队列中读取?

我尝试使用以下代码启动 3 个进程,但只启动了 1 个进程,这是因为 .join() 阻止了第二个进程启动?:

for i in range(0 , 3):
    read_process = Process(target=q.read_queue,
                                     args=(work_tasks,))
    print('Starting read_process' , i)
    read_process.start()
    read_process.join()

我也考虑过使用Pool,如https://docs.python.org/2/library/multiprocessing.html 中所述,但这似乎与转换现有集合有关:

print pool.map(f, range(10))

如何启动n个线程,每个线程处理一个共享队列?

【问题讨论】:

  • 谷歌'Python生产者消费者队列',尽量不关注'join':(
  • @MartinJames 谢谢,bogotobogo.com/python/Multithread/… 等文章指的是线程模块而不是多处理模块 - 你建议使用线程而不是多处理还是将两者结合起来?

标签: python python-3.x multithreading multiprocessing


【解决方案1】:

你可以把它放到列表中,然后在创建循环之外加入它:

if __name__ == '__main__':
    q = QueueFun()
    work_tasks = Queue()

    write_process = Process(target=q.writing_queue,
                            args=(work_tasks,))
    write_process.start()

    processes = []
    for i in range(0, 5):
        processes.append(Process(target=q.read_queue,
                                 args=(work_tasks,)))

    for p in processes:
        p.start()

    write_process.join()
    for p in processes:
        p.join()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-11-12
    • 2018-10-27
    • 1970-01-01
    • 1970-01-01
    • 2017-06-15
    • 2014-05-09
    • 2016-05-02
    • 2015-04-15
    相关资源
    最近更新 更多