【发布时间】:2012-12-05 23:07:11
【问题描述】:
我已经实现了一个消费者/生产者优先级队列,其中优先级实际上是一个时间戳,表示应该何时交付项目。它工作得很好,但我想知道是否有人有更好的想法来实现这个或关于当前实现的 cmets。
代码在 Python 中。创建单个线程以按时唤醒等待的消费者。我知道这是在库中创建线程的反模式,但我无法设计另一种方法。
代码如下:
import collections
import heapq
import threading
import time
class TimelyQueue(threading.Thread):
"""
Implements a similar but stripped down interface of Queue which
delivers items on time only.
"""
class Locker:
def __init__(self, lock):
self.l = lock
def __enter__(self):
self.l.acquire()
return self.l
def __exit__(self, type, value, traceback):
self.l.release()
# Optimization to avoid wasting CPU cycles when something
# is about to happen in less than 5 ms.
_RESOLUTION = 0.005
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
self.queue = []
self.triggered = collections.deque()
self.putcond = threading.Condition()
self.getcond = threading.Condition()
# Optimization to avoid waking the thread uselessly.
self.putwaketime = 0
def put(self, when, item):
with self.Locker(self.putcond):
heapq.heappush(self.queue, (when, item))
if when < self.putwaketime or self.putwaketime == 0:
self.putcond.notify()
def get(self, timeout=None):
with self.Locker(self.getcond):
if len(self.triggered) > 0:
when, item = self.triggered.popleft()
return item
self.getcond.wait(timeout)
try:
when, item = self.triggered.popleft()
except IndexError:
return None
return item
def qsize(self):
with self.Locker(self.putcond):
return len(self.queue)
def run(self):
with self.Locker(self.putcond):
maxwait = None
while True:
curtime = time.time()
try:
when, item = self.queue[0]
maxwait = when - curtime
self.putwaketime = when
except IndexError:
maxwait = None
self.putwaketime = 0
self.putcond.wait(maxwait)
curtime = time.time()
while True:
# Don't dequeue now, we are not sure to use it yet.
try:
when, item = self.queue[0]
except IndexError:
break
if when > curtime + self._RESOLUTION:
break
self.triggered.append(heapq.heappop(self.queue))
if len(self.triggered) > 0:
with self.Locker(self.getcond):
self.getcond.notify()
if __name__ == "__main__":
q = TimelyQueue()
q.start()
N = 50000
t0 = time.time()
for i in range(N):
q.put(time.time() + 2, i)
dt = time.time() - t0
print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
t0 = time.time()
i = 0
while i < N:
a = q.get(3)
if i == 0:
dt = time.time() - t0
print "start get after %.3fs" % dt
t0 = time.time()
i += 1
dt = time.time() - t0
print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)
【问题讨论】:
-
我可能会让线程的存在不那么明确,所以对象看起来像
Queue而不是Thread。另外,你为什么要围绕Condition构建一个Locker上下文管理器?见the docs。 -
@abarnert 线程的好主意,我将在构造函数中创建它。关于
Locker我想我从来没有读过那么远的文档:)。感谢您的提示! -
这是否意味着单消费者实现?如果没有,您可能需要考虑
notifyAll,因为可能有多个条目同时到期。更重要的是,您可能需要多线程单元测试。 -
@abarnert 不,多个消费者。你是对的,这是一个错误。我已经在外部编写了一个多线程测试单元,但它太大而无法内联到文件中,而且代码太丑了,我羞于发布它。
-
最后一条不太相关的评论:您可能想查看Queue.py 的源代码以及标准库中的单元测试,以确保您没有错过任何重要的事情。看看你是否真的可以将它构建为
Queue的包装器(就像标准的PriorityQueue)而不是从头开始,这样你就可以免费获得一些额外的功能(主要是maxsize,这很难做对)——尽管一如既往,请记住 YAGNI;这可能不值得。
标签: python priority-queue anti-patterns