【问题标题】:How to gather results from Python's Thread class with an input and output queue?如何使用输入和输出队列从 Python 的 Thread 类中收集结果?
【发布时间】:2021-09-20 16:47:10
【问题描述】:

我正在尝试学习一些关于 python 中的线程的知识。我知道我可以使用各种其他池和进程或管道,这可能更容易,但我对使用线程模块感兴趣。

from threading import Thread
from queue import Queue

class SimulationThread(Thread):

    def __init__(self, input_queue: Queue, results_queue: Queue):
        Thread.__init__(self)
        self.input_queue = input_queue
        self.results_queue = results_queue

    def run(self) -> None:
        try:
            data = self.input_queue.get() # will be replaced with simulation data
            self.results_queue.put(data)
        finally:
            return self.input_queue.task_done()

N = 10 # number of simulations to run
NP = 8 # number of threads to use
input_queue = Queue()
results_queue = Queue()

for x in range(NP):
    worker = SimulationThread(input_queue, results_queue)
    worker.daemon = True
    worker.start()

for i in range(N):
    input_queue.put(i)

现在我尝试了几种不同的方法来收集结果:

# always returns 0
print(results_queue.get())
#hangs
results_queue.join()
# does nothing, I'm quessing queue is not yet populated
while not results_queue.empty():
    print(results_queue.get())
# prints nothing
ret = results_queue.get()
while ret is None:
    ret = results_queue.get()
    print(ret)
# finally prints out the results, but in order of 1 - 7. No 8 or 9.
ret = results_queue.get()
while ret is not None:
    ret = results_queue.get()
    print(ret)
    if results_queue.empty():
        ret = None

这时我停下来寻求帮助。如何让所有 NP 线程同时处理所有 N 个数字?

【问题讨论】:

    标签: python multithreading queue


    【解决方案1】:

    默认情况下,Queue.get 将在必要时阻止,直到项目可用。您将 10 个项目添加到 input_queue 队列,但随后您只创建了 8 个线程。

    您的线程应该不断地从队列中收集和处理项目,直到它们被停止。你可以试试这样的:

    test.py:

    from queue import Queue
    from threading import Thread
    
    N = 10  # number of simulations to run
    NP = 8  # number of threads to use
    
    
    class SimulationThread(Thread):
        def __init__(self, input_queue, results_queue):
            super().__init__()
    
            self.input_queue = input_queue
            self.results_queue = results_queue
    
        def run(self):
            for data in iter(self.input_queue.get, "STOP"):
                self.results_queue.put(data * 2)
    
    
    def main():
        input_queue = Queue()
        results_queue = Queue()
    
        for i in range(N):
            input_queue.put(i)
    
        for _ in range(NP):
            SimulationThread(input_queue, results_queue).start()
    
        for i in range(N):
            print(i, results_queue.get())
    
        for _ in range(NP):
            input_queue.put("STOP")
    
    
    if __name__ == "__main__":
        main()
    

    测试:

    $ python test.py
    0 0
    1 2
    2 4
    3 6
    4 8
    5 10
    6 12
    7 14
    8 16
    9 18
    

    【讨论】:

    • 感谢您的回答。你能解释一下你用for data in iter(self.input_queue.get, "STOP"):做什么吗?
    • 好的,我想我明白了 - 这里的 "STOP" 是迭代器终止符,所以循环继续直到遇到 "STOP"
    • 是的,docs也有很好的解释。
    【解决方案2】:

    您可能想使用JoinableQueue。每个任务完成其工作后,都会从输入中调用队列中的.task_done()

    然后您的主线程在同一个队列上调用queue.join()。在对task_done() 的调用与添加到队列中的项目一样多之前,这不会返回。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-07-02
      • 2021-05-05
      相关资源
      最近更新 更多