【问题标题】:A generic priority queue for PythonPython 的通用优先级队列
【发布时间】:2010-09-29 07:04:11
【问题描述】:

我需要在我的 Python 代码中使用优先级队列,并且:

  • 正在寻找优先级队列的任何快速实现
  • 最理想的情况是,我希望队列是通用的(即适用于具有指定比较运算符的任何对象)。

四处寻找有效的东西,我遇到了heapq,但是:

  • 我正在寻找比 heapq 更快的东西,它是在原生 Python 中实现的,所以它并不快。
  • 看起来不错,但似乎只为整数指定。我想它适用于任何具有比较运算符的对象,但它没有指定它需要哪些比较运算符。
  • 更新:在heapq 中重新比较,我可以按照查理马丁的建议使用(priority, object),或者只为我的对象实现__cmp__

【问题讨论】:

  • heapq 是用 Python 实现的,并不一定意味着它不快。为什么不直接使用它?仅在不能满足您的性能需求时尝试替代方案。

标签: python queue


【解决方案1】:

您可以使用Queue.PriorityQueue

回想一下,Python 不是强类型的,因此您可以保存任何您喜欢的内容:只需创建一个 (priority, thing) 元组即可。

【讨论】:

  • 没有窥视功能 :-(
  • @Eli Bendersky:你有没有在这个和heapq 之间进行性能比较?我会假设 heapq 更快,因为它不做任何锁定。
  • @larsmans 我在 Python2.6 中做了一些简单的测试,表明 heapq 的速度大约是 PriorityQueue 的两倍
  • Queue.PriorityQueue 已同步。对于不需要同步的情况,它会产生不必要的开销。
  • Python 是强类型的。它不是静态的、明显的类型。
【解决方案2】:

在使用优先级队列时,reduce-key是很多算法(Dijkstra's Algorithm, A*, OPTICS)的必备操作,不知道为什么Python内置的优先级队列不支持。其他答案都没有提供支持此功能的解决方案。

一个也支持减少键操作的优先级队列是this Daniel Stutzbach 的实现,它在 Python 3.5 中非常适合我。

from heapdict import heapdict

hd = heapdict()
hd["two"] = 2
hd["one"] = 1
obj = hd.popitem()
print("object:",obj[0])
print("priority:",obj[1])

# object: one
# priority: 1

【讨论】:

  • 似乎是一个合理的答案:反对者应该在这里解释自己。
  • (heapdict)[] 的文档说hd.pop(),但调用heapdict({None: 1}).pop() 会得到TypeError: pop() missing 1 required positional argument: 'key',类似于普通的字典。请改用popitem()[0]...
【解决方案3】:

我最终为heapq 实现了一个包装器,添加了一个字典来保持队列元素的唯一性。结果应该对所有操作员都非常有效:

class PriorityQueueSet(object):

    """
    Combined priority queue and set data structure.

    Acts like a priority queue, except that its items are guaranteed to be
    unique. Provides O(1) membership test, O(log N) insertion and O(log N)
    removal of the smallest item.

    Important: the items of this data structure must be both comparable and
    hashable (i.e. must implement __cmp__ and __hash__). This is true of
    Python's built-in objects, but you should implement those methods if you
    want to use the data structure for custom objects.
    """

    def __init__(self, items=[]):
        """
        Create a new PriorityQueueSet.

        Arguments:
            items (list): An initial item list - it can be unsorted and
                non-unique. The data structure will be created in O(N).
        """
        self.set = dict((item, True) for item in items)
        self.heap = self.set.keys()
        heapq.heapify(self.heap)

    def has_item(self, item):
        """Check if ``item`` exists in the queue."""
        return item in self.set

    def pop_smallest(self):
        """Remove and return the smallest item from the queue."""
        smallest = heapq.heappop(self.heap)
        del self.set[smallest]
        return smallest

    def add(self, item):
        """Add ``item`` to the queue if doesn't already exist."""
        if item not in self.set:
            self.set[item] = True
            heapq.heappush(self.heap, item)

【讨论】:

  • 看起来不错,但您应该使用“item in set”而不是“set.has_key(item)”。它更快(更少的方法调用开销),第二个已在 Python 3.0 中删除。
  • items=[] 是个坏主意,因为列表是可变的。另外,你可以在__init__()中做self.set=set(items)
  • 看起来比docs中提供的实现更简洁。
  • @alecxe 我认为这是因为这不支持decrease-key,而decrease-key 是这些文档中整个“删除”概念的原因(正是“删除”逻辑使得功能看起来不太干净)
【解决方案4】:

您可以将 heapq 用于非整数元素(元组)

from heapq import *

heap = []
data = [(10,"ten"), (3,"three"), (5,"five"), (7,"seven"), (9, "nine"), (2,"two")]
for item in data:
    heappush(heap, item)
sorted = []
while heap:
    sorted.append(heappop(heap))
print sorted
data.sort()
print data == sorted

【讨论】:

  • 这是一个很好的方法,使用它时,添加一个虚拟计数器很有用(总是增加对heappush的每个调用作为元组的第二个元素,这样当两个条目具有相同的优先级时(意味着元组的第一个元素相等),元组根据它们添加的顺序进行排序。这给出了在这种边缘情况下优先级队列的预期结果。
【解决方案5】:

我没用过,但你可以试试PyHeap。它是用 C 语言编写的,所以希望它对你来说足够快。

你确定 heapq/PriorityQueue 不够快吗?可能值得从其中一个开始,然后进行分析以查看它是否真的是您的性能瓶颈。

【讨论】:

    【解决方案6】:

    你看过 heapq 页面上的"Show Source" link 了吗?有一个示例使用带有 (int, char) 元组列表的堆作为优先级队列。

    【讨论】:

    • 我接受纠正(本杰明·彼得森)。 heapq 使用 C 实现,速度很快。
    【解决方案7】:

    这很有效,也适用于字符串或任何类型的输入 -:)

    import itertools
    from heapq import heappush, heappop
    
    pq = []                         # list of entries arranged in a heap
    entry_finder = {}               # mapping of tasks to entries
    REMOVED = '<removed-task>'      # placeholder for a removed task
    counter = itertools.count()     # unique sequence count
    
    def add_task(task, priority=0):
        'Add a new task or update the priority of an existing task'
        if task in entry_finder:
            remove_task(task)
        count = next(counter)
        entry = [priority, count, task]
        entry_finder[task] = entry
        heappush(pq, entry)
    
    def remove_task(task):
        'Mark an existing task as REMOVED.  Raise KeyError if not found.'
        entry = entry_finder.pop(task)
        entry[-1] = REMOVED
    
    def pop_task():
        'Remove and return the lowest priority task. Raise KeyError if empty.'
        while pq:
            priority, count, task = heappop(pq)
            if task is not REMOVED:
                del entry_finder[task]
                return task
        raise KeyError('pop from an empty priority queue')
    

    参考: http://docs.python.org/library/heapq.html

    【讨论】:

    • 与 4 月 1 日发布的答案保持一致,使用 object() 作为标记/已删除。
    【解决方案8】:

    我正在 python 3 中使用queue.PriorityQueue 像这样实现priority queue-

    from queue import PriorityQueue
    
    class PqElement(object):
        def __init__(self, value: int):
            self.val = value
    
        #Custom Compare Function (less than or equsal)
        def __lt__(self, other):
            """self < obj."""
            return self.val > other.val
    
        #Print each element function
        def __repr__(self):
            return f'PQE:{self.val}'
    
    #Usage-
    pq = PriorityQueue()
    pq.put(PqElement(v))       #Add Item      - O(Log(n))
    topValue = pq.get()        #Pop top item  - O(1)
    topValue = pq.queue[0].val #Get top value - O(1)
    

    【讨论】:

    • 似乎是最大队列而不是最小队列? self.val &gt; other.val 如果没有,你能解释一下这里的极性吗?谢谢
    • 你是对的@WestCoastProjects,我已经给出了我的工具,以便任何人都可以根据需要对其进行修改。如果将__lt__ 函数从self.val &gt; other.val 更改为self.val &lt; other.val,那么它将是最小队列。
    【解决方案9】:

    我在https://pypi.python.org/pypi/fibonacci-heap-mod 有一个优先级队列/斐波那契堆

    它并不快(delete-min 上的大常数 c,即 O(c*logn))。但是 find-min、insert、reduce-key 和 merge 都是 O(1) - IOW,它很懒。

    如果在 CPython 上太慢,您可以尝试 Pypy、Nuitka 甚至 CPython+Numba :)

    【讨论】:

      【解决方案10】:

      我可以按照查理·马丁的建议使用(priority, object),或者只为我的对象实现__cmp__

      如果您希望根据特定规则对插入的对象进行优先级排序,我发现编写一个简单的PriorityQueue 子类非常有帮助,它接受一个键函数。您不必手动插入(priority, object) 元组,处理起来感觉更自然。

      所需行为的演示

      >>> h = KeyHeap(sum)
      >>> h.put([-1,1])
      >>> h.put((-1,-2,-3))
      >>> h.put({100})
      >>> h.put([1,2,3])
      >>> h.get()
      (-1, -2, -3)
      >>> h.get()
      [-1, 1]
      >>> h.get()
      [1, 2, 3]
      >>> h.get()
      set([100])
      >>> h.empty()
      True
      >>>
      >>> k = KeyHeap(len)
      >>> k.put('hello')
      >>> k.put('stackoverflow')
      >>> k.put('!')
      >>> k.get()
      '!'
      >>> k.get()
      'hello'
      >>> k.get()
      'stackoverflow'
      

      Python 2 代码

      from Queue import PriorityQueue
      
      class KeyHeap(PriorityQueue):
          def __init__(self, key, maxsize=0):            
              PriorityQueue.__init__(self, maxsize)
              self.key = key
      
          def put(self, x):
              PriorityQueue.put(self, (self.key(x), x))
      
          def get(self):
              return PriorityQueue.get(self)[1]
      

      Python 3 代码

      from queue import PriorityQueue
      
      class KeyHeap(PriorityQueue):
          def __init__(self, key, maxsize=0):            
              super().__init__(maxsize)
              self.key = key
      
          def put(self, x):
              super().put((self.key(x), x))
      
          def get(self):
              return super().get()[1]
      

      显然,如果您尝试插入您的键函数无法处理的对象,调用 put 将(并且应该!)引发错误。

      【讨论】:

        【解决方案11】:

        如果您想保持整个列表有序,而不仅仅是最高值,我在多个项目中使用了此代码的一些变体,它是替换标准 list 类的一个下降,具有类似的 api:

        import bisect
        
        class OrderedList(list):
            """Keep a list sorted as you append or extend it
        
            An ordered list, this sorts items from smallest to largest using key, so
            if you want MaxQueue like functionality use negative values: .pop(-1) and
            if you want MinQueue like functionality use positive values: .pop(0)
            """
            def __init__(self, iterable=None, key=None):
                if key:
                    self.key = key
                self._keys = []
                super(OrderedList, self).__init__()
                if iterable:
                    for x in iterable:
                        self.append(x)
        
            def key(self, x):
                return x
        
            def append(self, x):
                k = self.key(x)
                # https://docs.python.org/3/library/bisect.html#bisect.bisect_right
                i = bisect.bisect_right(self._keys, k)
                if i is None:
                    super(OrderedList, self).append((self.key(x), x))
                    self._keys.append(k)
                else:
                    super(OrderedList, self).insert(i, (self.key(x), x))
                    self._keys.insert(i, k)
        
            def extend(self, iterable):
                for x in iterable:
                    self.append(x)
        
            def remove(self, x):
                k = self.key(x)
                self._keys.remove(k)
                super(OrderedList, self).remove((k, x))
        
            def pop(self, i=-1):
                self._keys.pop(i)
                return super(OrderedList, self).pop(i)[-1]
        
            def clear(self):
                super(OrderedList, self).clear()
                self._keys.clear()
        
            def __iter__(self):
                for x in super(OrderedList, self).__iter__():
                    yield x[-1]
        
            def __getitem__(self, i):
                return super(OrderedList, self).__getitem__(i)[-1]
        
            def insert(self, i, x):
                raise NotImplementedError()
            def __setitem__(self, x):
                raise NotImplementedError()
            def reverse(self):
                raise NotImplementedError()
            def sort(self):
                raise NotImplementedError()
        

        它可以默认处理像(priority, value)这样的元组,但你也可以像这样自定义它:

        class Val(object):
            def __init__(self, priority, val):
                self.priority = priority
                self.val = val
        
        h = OrderedList(key=lambda x: x.priority)
        
        h.append(Val(100, "foo"))
        h.append(Val(10, "bar"))
        h.append(Val(200, "che"))
        
        print(h[0].val) # "bar"
        print(h[-1].val) # "che"
        

        【讨论】:

          【解决方案12】:

          如果您只有一个“更高优先级”级别而不是queue.PriorityQueue 支持的任意多个,您可以通过在左侧插入普通作业.appendleft() 来有效地使用collections.deque,并插入您的更高优先级- 右侧优先条目.append()

          queue 和 deque 实例都有线程安全的 push/pop 方法

          双端队列的其他优势

          • 允许查看任意元素(可索引和可​​迭代而不会弹出,而队列实例只能弹出)
          • 明显快于queue.PriorityQueue(请参阅下面的粗略测试)

          关于长度限制的注意事项

          • 设置长度将使其将元素从任一端推出,而不仅仅是从左侧推出,这与队列实例不同,它会阻塞或引发queue.Full
          • 如果输入速率超过消耗,任何无限制的集合最终都会导致系统内存不足
          import threading
          from collections import deque as Deque
          
          Q = Deque()  # don't set a maximum length
          
          def worker_queue_creator(q):
              sleepE = threading.Event()  # use wait method for sleeping thread
              sleepE.wait(timeout=1)
          
              for index in range(3):  # start with a few jobs
                  Q.appendleft("low job {}".format(index))
          
              Q.append("high job 1")  # add an important job
          
              for index in range(3, 3+3):  # add a few more jobs
                  Q.appendleft("low job {}".format(index))
          
              # one more important job before ending worker
              sleepE.wait(timeout=2)
              Q.append("high job 2")
          
              # wait while the consumer worker processes these before exiting
              sleepE.wait(timeout=5)
          
          def worker_queue_consumer(q):
              """ daemon thread which consumes queue forever """
              sleepE = threading.Event()  # use wait method for sleeping thread
              sleepE.wait(timeout=1)  # wait a moment to mock startup
              while True:
                  try:
                      pre_q_str = str(q)  # see what the Deque looks like before before pop
                      job = q.pop()
                  except IndexError:  # Deque is empty
                      pass            # keep trying forever
                  else:  # successfully popped job
                      print("{}: {}".format(job, pre_q_str))
                  sleepE.wait(timeout=0.4)  # quickly consume jobs
          
          
          # create threads to consume and display the queue
          T = [
              threading.Thread(target=worker_queue_creator, args=(Q,)),
              threading.Thread(target=worker_queue_consumer, args=(Q,), daemon=True),
          ]
          
          for t in T:
              t.start()
          
          T[0].join()  # wait on sleep in worker_queue_creator to quit
          
          % python3 deque_as_priorityqueue.py
          high job 1: deque(['low job 5', 'low job 4', 'low job 3', 'low job 2', 'low job 1', 'low job 0', 'high job 1'])
          low job 0: deque(['low job 5', 'low job 4', 'low job 3', 'low job 2', 'low job 1', 'low job 0'])
          low job 1: deque(['low job 5', 'low job 4', 'low job 3', 'low job 2', 'low job 1'])
          low job 2: deque(['low job 5', 'low job 4', 'low job 3', 'low job 2'])
          low job 3: deque(['low job 5', 'low job 4', 'low job 3'])
          high job 2: deque(['low job 5', 'low job 4', 'high job 2'])
          low job 4: deque(['low job 5', 'low job 4'])
          low job 5: deque(['low job 5'])
          

          比较

          import timeit
          
          NUMBER = 1000
          
          values_builder = """
          low_priority_values  = [(1, "low-{}".format(index)) for index in range(5000)]
          high_priority_values = [(0, "high-{}".format(index)) for index in range(1000)]
          """
          
          deque_setup = """
          from collections import deque as Deque
          Q = Deque()
          """
          deque_logic_input = """
          for item in low_priority_values:
              Q.appendleft(item[1])  # index into tuples to remove priority
          for item in high_priority_values:
              Q.append(item[1])
          """
          deque_logic_output = """
          while True:
              try:
                  v = Q.pop()
              except IndexError:
                  break
          """
          
          queue_setup = """
          from queue import PriorityQueue
          from queue import Empty
          Q = PriorityQueue()
          """
          queue_logic_input = """
          for item in low_priority_values:
              Q.put(item)
          for item in high_priority_values:
              Q.put(item)
          """
          
          queue_logic_output = """
          while True:
              try:
                  v = Q.get_nowait()
              except Empty:
                  break
          """
          
          # abuse string catenation to build the setup blocks
          results_dict = {
              "deque input":  timeit.timeit(deque_logic_input, setup=deque_setup+values_builder, number=NUMBER),
              "queue input":  timeit.timeit(queue_logic_input, setup=queue_setup+values_builder, number=NUMBER),
              "deque output": timeit.timeit(deque_logic_output, setup=deque_setup+values_builder+deque_logic_input, number=NUMBER),
              "queue output": timeit.timeit(queue_logic_output, setup=queue_setup+values_builder+queue_logic_input, number=NUMBER),
          }
          
          for k, v in results_dict.items():
              print("{}: {}".format(k, v))
          

          结果(推送和弹出 6000 个元素,timeit number=1000

          % python3 deque_priorityqueue_compare.py
          deque input: 0.853059
          queue input: 24.504084000000002
          deque output: 0.0013576999999997952
          queue output: 0.02025689999999969
          

          虽然这是展示双端队列性能的虚构示例,但PriorityQueue 的插入时间是significant function of its length and O(log n) or worse,而Deque is O(1),因此它应该相当代表真实用例

          【讨论】:

            【解决方案13】:

            一个简单的工具:

            因为PriorityQueue 是第一个较低的。

            from queue import PriorityQueue
            
            
            class PriorityQueueWithKey(PriorityQueue):
                def __init__(self, key=None, maxsize=0):
                    super().__init__(maxsize)
                    self.key = key
            
                def put(self, item):
                    if self.key is None:
                        super().put((item, item))
                    else:
                        super().put((self.key(item), item))
            
                def get(self):
                    return super().get(self.queue)[1]
            
            
            a = PriorityQueueWithKey(abs)
            a.put(-4)
            a.put(-3)
            print(*a.queue)
            

            【讨论】:

              猜你喜欢
              • 2013-02-13
              • 1970-01-01
              • 2016-09-23
              • 2023-04-02
              • 1970-01-01
              • 2011-12-20
              • 1970-01-01
              • 1970-01-01
              • 2011-03-20
              相关资源
              最近更新 更多