【问题标题】:Shutting down daemon thread properly正确关闭守护线程
【发布时间】:2018-04-30 11:04:55
【问题描述】:

我创建了自己的线程类,将在线程池中使用如下:

class SendFilesThread(threading.Thread):
    execute = True

    def __init__(self, port, rate, fileQueue):
        self.daemon = True
        self.fileQueue = fileQueue
        #Other initialization

    def run(self):
        while SendFilesThread.execute == True or not self.fileQueue.empty():
            self.filename = self.fileQueue.get()
            #some file processing
            self.fileQueue.task_done()

    @staticmethod
    def terminate():
        SendFilesThread.execute = False

在程序的主线程中,处理完所有文件后,我尝试关闭线程池,如下所示:

SendFilesThread.terminate()
for t in threadPool:
    if t.is_alive():
        t.join()
exit()

我的理解是,如果我调用join(),它将阻塞调用线程,在这种情况下是主线程,直到加入的线程完成处理。我的问题是,尽管线程完成了,但它永远不会返回到主线程并且程序只是挂起。关闭线程池时我做错了什么吗?

【问题讨论】:

  • 由于你设置了self.daemon = True,当主线程退出时线程会自动终止——所以你不需要join()s。但是,在SendFilesThread.__init__() 中,您忘记使用Thread.__init__()) (或更好的super().__init__())初始化基类,然后再按照documentation 所说的那样做。
  • 实际上,我确实这样做了。可以假设#Other initialization 包括正确初始化线程的所有其他必要步骤。 __init__ 方法中还有许多其他行,我认为这些行只会使问题陷入不必要的代码行。我知道,当主线程终止时,制作treads 守护程序将强制它们终止,这就是我首先将它们制作为守护程序的原因。我的问题是,在所有文件都发送到我的fileQueue 之后,线程需要在主线程关闭之前有时间完成。

标签: python multithreading


【解决方案1】:

有两件事。首先,从设计的角度来看,让execute 标志位于类级别有点奇怪。通常最好将它作为每个类实例上的标志。

其次,你有一个竞争条件。 execute 标志由多个线程访问,并且不受同步原语(例如互斥锁)的保护。这意味着terminate() 调用可以在任何工作线程检查该标志之后运行(并设置标志),但该线程尝试将下一个文件名出列。因为你在没有超时的情况下调用get(),所以工作线程会挂在这里,主线程会阻塞在t.join()调用中。出现死锁。

有很多方法可以解决这个问题。您可以使用线程同步原语(例如互斥锁)来保护execute 标志,或者使用threading.Event 对象代替简单的布尔值。

另一个,在我看来更简单的解决方案是在同一个队列上发送一个“哨兵”值,这表明线程应该退出。看起来您正在发送字符串文件名,因此空字符串可能是一个不错的选择。 (None也常用。)

每个线程的工作循环现在如下所示:

def run(self):
    while True:
        self.filename = self.fileQueue.get()
        if self.filename == '':
            return
        # Process file

terminate() 静态方法不同,主线程为您拥有的每个工作线程放置一个空字符串(即fileQueue.enqueue(''))。

【讨论】:

  • 非常好的点。我认为没有必要将调用同步到我的执行标志,因为它不是数据结构。
  • @nb12345 是的,几乎任何可以被多个线程访问的东西,无论对象类型如何,都容易发生数据竞争。
【解决方案2】:

如果fileQueuemultiprocessing.Queuequeue.Queue,那么它的.get() 方法将阻塞,直到队列中有东西到达。尝试使用.get_nowait(),但如果您希望在队列为空的情况下继续执行,则可能必须将其包装在 try 块中。

try:
    self.filename = self.fileQueue.get_nowait()
except queue.Empty:
    time.sleep(1)

更多信息请参见queue docs

【讨论】:

  • 成功了!谢谢你,我不敢相信我错过了这么小的东西。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2012-07-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多