【发布时间】:2011-04-06 00:47:47
【问题描述】:
我正在学习使用队列模块,但对于如何让队列消费者线程知道队列已完成有点困惑。理想情况下,我想在消费者线程中使用get(),如果队列被标记为“完成”,它会抛出异常。有没有比附加一个标记值来标记队列中的最后一项更好的方法来传达这一点?
【问题讨论】:
标签: python multithreading queue
我正在学习使用队列模块,但对于如何让队列消费者线程知道队列已完成有点困惑。理想情况下,我想在消费者线程中使用get(),如果队列被标记为“完成”,它会抛出异常。有没有比附加一个标记值来标记队列中的最后一项更好的方法来传达这一点?
【问题讨论】:
标签: python multithreading queue
基于Glenn Maynard 和其他一些suggestions(谢谢!),我决定汇总实现close 方法的Queue.Queue 的后代。它以原始(未打包)module 的形式提供。当我有更多时间时,我会清理一下并妥善包装它。目前该模块仅包含CloseableQueue 类和Closed 异常类。我计划将其扩展为还包括Queue.LifoQueue 和Queue.PriorityQueue 的子类。
它目前处于相当初步的状态,也就是说,虽然它通过了它的测试套件,但我还没有真正将它用于任何事情。你的旅费可能会改变。我会通过令人振奋的消息不断更新这个答案。
CloseableQueue 类与 Glenn 的建议略有不同,因为关闭队列将阻止未来的puts,但不会阻止未来的gets,直到队列清空。这对我来说最有意义;似乎可以将清除队列的功能添加为与可关闭性功能正交的单独的 mixin*。所以基本上使用CloseableQueue,通过关闭队列,您表明最后一个元素是put。还有一个选项可以通过将last=True 传递给最终的put 调用来自动执行此操作。队列清空后对put 的后续调用,以及对get 的后续调用,以及与这些描述匹配的未完成的阻塞调用,将引发Closed 异常。
这对于单个生产者为一个或多个消费者生成数据的情况非常有用,但对于消费者正在等待特定项目或一组项目的多方安排也很有用。特别是它没有提供一种方法来确定所有生产者是否都已完成生产。要想实现这一点,需要提供某种方式来注册生产者 (.open()?),以及一种表明生产者注册本身已关闭的方法。
非常欢迎提出建议和/或代码审查。我还没有写很多并发代码,但希望测试套件足够彻底,代码通过它的事实表明代码的质量,而不是套件缺乏质量。我能够重用队列模块的测试套件中的一堆代码:文件本身包含在这个模块中,并用作各种子类和例程的基础,包括回归测试。这可能(希望)有助于避免测试部门完全无能。代码本身只是覆盖Queue.get 和Queue.put 并进行了相当小的更改,并添加了close 和closed 方法。
我有意避免在代码本身和测试套件中使用任何新奇的东西,如上下文管理器,以保持代码与 Queue 模块本身一样向后兼容,这是相当可观的确实倒退了。我可能会在某个时候添加__enter__ 和__exit__ 方法;否则,contextlib 的 closing 函数应该适用于 CloseableQueue 实例。
*:这里我松散地使用术语“mixin”。由于Queue 模块的类是旧式的,因此需要使用类工厂函数来混合混合;有一些限制;在 Guido 禁止的情况下提供无效。
CloseableQueue 模块现在提供 CloseableLifoQueue 和 CloseablePriorityQueue 类。我还添加了一些便利功能来支持迭代。仍然需要将其作为适当的包进行返工。有一个类工厂函数可以方便地对其他 Queue.Queue 派生类进行子类化。
CloseableQueue 现在可以通过PyPI 获得,例如与
$ easy_install CloseableQueue
欢迎评论和批评,尤其是来自此答案的匿名投票者。
【讨论】:
队列本身并没有完成或完成的想法。它们可以无限期使用。完成后要关闭它,您确实需要在最后放置 None 或其他一些神奇的值,并编写逻辑来检查它,如您所述。理想的方法可能是继承 Queue 对象。
请参阅http://en.wikipedia.org/wiki/Queue_(data_structure),了解有关队列的更多信息。
【讨论】:
is 检查时它将具有唯一标识。除非使用multiprocessing,否则这似乎会很好用,在这种情况下,我想我需要生成一个 UUID 或其他东西。
QueueFinished = object()。将该对象排队,并使用item is QueueFinished 对其进行测试。
哨兵是关闭队列的一种自然方式,但有几点需要注意。
首先,记住你可能有多个consumer,所以你需要为每个运行的consumer发送一次sentinel,并保证每个consumer只会消费一个sentinel,保证每个consumer都能收到它的shutdown sentinel。
其次,请记住 Queue 定义了一个接口,并且在可能的情况下,代码的行为应该与底层 Queue 无关。您可能有一个 PriorityQueue,或者您可能有一些其他类,它们公开相同的接口并以其他顺序返回值。
不幸的是,很难同时处理这两个问题。为了处理不同队列的一般情况,正在关闭的消费者必须在收到其关闭标记后继续消费值,直到队列为空。这意味着它可能会消耗另一个线程的哨兵。这是 Queue 接口的一个弱点:它应该有一个 Queue.shutdown 调用来导致所有消费者抛出一个异常,但是没有这个。
所以,在实践中:
【讨论】:
队列是一个 FIFO(先进先出)寄存器,因此请记住消费者可以比生产者更快。当消费者线程检测到队列为空时,通常会执行以下操作之一:
如果您不希望消费者线程在作业完成后终止,而不是在队列中放入一个哨兵值来终止任务。
【讨论】:
这样做的最佳做法是让队列本身通知客户端它已达到“完成”状态。然后客户可以采取任何适当的行动。
您的建议;检查队列以查看它是否定期完成,这是非常不可取的。轮询是多线程编程中的一种反模式,您应该始终使用通知。
编辑:
所以你说队列本身知道它基于某些标准“完成”并且需要通知客户这一事实。我认为您是正确的,最好的方法是在客户端调用 get() 并且队列处于完成状态时抛出。如果您抛出这将否定客户端对哨兵值的需要。在内部,队列可以以任何它喜欢的方式检测到它已经“完成”,例如队列是空的,它的状态设置为完成等我认为不需要哨兵值。
【讨论】:
putting 一些唯一值,以向接收线程表明它已经结束。我的问题基本上是“让生产者通知其消费者它已经完成的最佳方式是什么?”
close 方法,这将导致所有阻塞和未来的get 调用引发异常;但它没有,所以这对他没有帮助。