【问题标题】:Signal the end of jobs on the Queue?指示队列上的作业结束?
【发布时间】:2013-10-23 15:35:14
【问题描述】:

这是来自 Python 文档的示例代码:

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

我修改它以适合我的用例,如下所示:

import threading
from Queue import Queue

max_threads = 10

q = Queue(maxsize=max_threads + 2)

def worker():
  while True:
    task = q.get(1)
    # do something with the task
    q.task_done()

for i in range(max_threads):
  t = threading.Thread(target=worker)
  t.start()

for task in ['a', 'b', 'c']:
  q.put(task)

q.join()

当我执行它时,调试器说所有作业都已执行,但 q.join() 似乎永远等待。如何向已发送所有任务的工作线程发送信号?

【问题讨论】:

  • 最新的可能错字:当循环变量为tasks 时,您是否打算将task 放入队列中?我不这么认为...
  • 是的,一个错字。谢谢,我编辑了。
  • 排队包含thread.exit()的任务?

标签: python multithreading queue message-queue python-multithreading


【解决方案1】:

q.join() 实际返回。您可以通过在 q.join() 行之后放置 print("done") 来测试它。

....
q.join()
print('done')

那么,为什么程序不结束呢? 因为,默认情况下,线程是非守护线程。

您可以使用<thread_object>.daemon = True 将线程设置为守护线程

for i in range(max_threads):
    t = threading.Thread(target=worker)
    t.daemon = True # <---
    t.start()

根据threading module documentation

守护进程

一个布尔值,指示此线程是否为守护线程 (真)与否(假)。这必须在调用 start() 之前设置, 否则引发 RuntimeError。它的初始值继承自 创建线程;主线程不是守护线程并且 因此在主线程中创建的所有线程默认为 daemon = 假的。

当没有存活的非守护线程时整个 Python 程序退出 离开了。

2.6 版中的新功能。

【讨论】:

  • 我确实注意到了一段时间,并将其与文档中的代码进行了比较。谢谢! ;)
  • 在这个例子中,守护进程工作线程是否会在 Python 主进程终止后永远存在?还是以某种方式清理干净?
  • @TravisBear,如果只有主线程和守护线程,主线程结束,程序结束。但是如果有任何非守护线程,它不会结束。
【解决方案2】:

此进程未在 .join() 完成,因为工作线程继续等待新的队列数据(阻塞 .get()

这是一个使用简单标志finishUp 告诉工人退出的方法,我们在.join() 完成后设置它 - 意味着所有任务都已处理。我在q.get() 调用中添加了一个超时,以允许它检查finishUp 标志

import threading
import queue

max_threads = 5
q = queue.Queue(maxsize=max_threads + 2)
finishUp = False

def worker():
    while True:
        try:
            task = q.get(block=True, timeout=1)
            # do something with the task
            print ("processing task for:"+str(task))
            q.task_done()
        except Exception as ex: # we get this exception when queue is empty
            if finishUp:
                print ("thread finishing because processing is done")
                return

for i in range(max_threads):
  t = threading.Thread(target=worker)
  t.start()

for task in ['a', 'b', 'c']:
  q.put(task)

print ("waiting on join")
q.join()
finishUp = True  # let the workers know that they can exit
print ("finished")

这会产生以下输出:

waiting on join
processing task for:a
processing task for:b
processing task for:c
finished
thread finishing because processing is done
thread finishing because processing is done
thread finishing because processing is done
thread finishing because processing is done
thread finishing because processing is done

Process finished with exit code 0

【讨论】:

    【解决方案3】:

    我定义了一个DONE 对象来表示工作结束:

    DONE = object()
    

    当上层知道没有更多数据到来时,将其放入队列中:

    q.put_nowait(DONE)
    

    在工作线程中,一旦收到对象,线程就退出。 但是如果有其他线程在同一个队列上监听,我们必须把对象放回队列中:

    item = q.get()
    if item is DONE:
        q.put_nowait(DONE)
        return
    

    干杯:)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-04-06
      • 2018-09-11
      • 1970-01-01
      • 2015-07-07
      • 1970-01-01
      • 1970-01-01
      • 2022-10-18
      相关资源
      最近更新 更多