【问题标题】:Running Median in O(log n) time in Python在 Python 中以 O(log n) 时间运行中位数
【发布时间】:2021-10-27 03:15:17
【问题描述】:

我试图在每次操作后找到一组数字的运行中位数,其中操作可以是以下之一:

add x - 在数字集合中添加数字 x

移除 x - 从数字集合中移除数字 x(假设 x 始终存在于集合中)

我的目标是在 O(log n) 时间内执行每个操作。如何在 Python 中实现这一点? Python 中是否有任何内置结构来实现这一点?

如果只允许 add x 操作,一种可能的算法是维护一个 minHeap(存储数字的下半部分)和一个 maxHeap(存储数字的上半部分),如 @ 中所述987654321@。但是如果 remove x 操作也被允许,那么删除一个值将需要 O(n) 时间(O(n) 用于线性搜索堆中的值,O(log n) 用于执行增加/减少按键操作并弹出一个元素)。那么,如何处理 O(log n) 时间内的值删除呢?

附: - 我知道如何在 C++ 中使用多重集来做到这一点。由于它们是作为平衡 BST 实现的,因此所有操作(插入、搜索和删除)都是 O(log n)。

【问题讨论】:

  • 如果您知道平衡的 BST 会给您所需的运行效率,为什么不使用平衡的 BST?
  • 实际上,我正在寻找 Python 中的内置构造(现在更新了问题)。但可以肯定的是,在最坏的情况下,可以自己编写。

标签: python time-complexity median


【解决方案1】:

由于您提到平衡 BST 作为一种可能的解决方案,您可以从任何实现 BST 或订单统计树的第三方模块(例如 Sorted Containers)获得所需的(最坏情况)时间界限。这样做的好处是已经在 O(log n) 时间内提供了添加、删除和任意索引。

如果您的问题是关于如何专门使用两个堆方法(即较小一半的数字使用 maxheap 和较大一半的 minheap)来执行这些操作,则有两个可行的选项来实现删除:

  1. 使用支持 Decrease-Key 的堆类型(例如 Fibonnaci 或 Brodal 堆)并保留指向所有值的指针;这具有删除最坏情况 O(log n) 的好处,但不适用于 Python 的内置 heapq library
  2. 使用延迟删除将元素标记为已删除。这样做的缺点是在摊销时间内只删除 O(log n),最坏的情况是 O(n),但这样做的好处是只使用 heapq 及其二进制堆。

这里是使用 heapq 和延迟删除的 minheap-maxheap 方法的 Python 实现。重建策略类似于通过开放寻址解决哈希冲突的冲突,是在两个堆上执行一次完整传递,并在删除元素的比例超过 50% 时删除所有删除的元素。

import collections
import heapq
import math
from typing import Union, List, Counter


class MedianHeap:
    def __init__(self):
        """ Data structure for holding numeric types, with O(log n) amortized insertion and deletion and
        constant worst case median finding.
        Maintains two heaps:
        a max heap with values smaller than (or eq. to) the median, and a
        min heap with values larger than (or eq. to) the median).
        We use lazy deletion from heaps (rebuilding them if more than ~50% of the heaps are deleted elements)
         to guarantee amortized insertion and deletion.

        The following class invariants are maintained before an external function call starts
        and after that function call ends:
            1. The front of each heap is either infinity (the empty heap sentinel), or a valid element
            2. All elements in self.max_heap have a value less than or equal to all elements in self.min_heap
            3. Size (i.e. undeleted elements) of self.min_heap is 0 or 1 plus the size of self.max_heap"""

        self.min_heap: List[Union[int, float]] = [math.inf]  # Add a sentinel value, to avoid repeated empty checks.
        self.max_heap: List[Union[int, float]] = [math.inf]  # on the left: all elements <= effective median

        self.total_real_elems: int = 0
        self.min_real_elems: int = 0
        self.max_real_elems: int = 0

        self.deleted: Counter[int, int] = collections.Counter()

        # If lazy deletion has caused our data structures to fill too much with deleted elements, trigger a rebuild
        # We rebuild if: (lazy_deletion_multiplier * #deleted) > total_size + lazy_deletion_constant.

        # By default, the multiplier is set at 50%, as is common for open-addressing hash tables which also use
        # lazy deletion. The constant can be increased based on performance needs.
        self.lazy_deletion_multiplier: int = 2
        self.lazy_deletion_constant: int = 500

    def insert(self, num: int) -> None:
        """Insert num into our MedianHeap. May not trigger a full rebuild. O(lg n) worst case time."""
        if not (-math.inf < num < math.inf):
            raise ValueError
        if self.total_real_elems == 0:
            heapq.heappush(self.min_heap, num)
            self.total_real_elems += 1
            self.min_real_elems += 1
            return None

        if num >= self.min_heap[0]:
            heapq.heappush(self.min_heap, num)
            self.min_real_elems += 1
        else:
            heapq.heappush(self.max_heap, -num)
            self.max_real_elems += 1

        self.total_real_elems += 1
        self._rebalance()
        return None

    def remove(self, num: int) -> None:
        """Change the status of one instance of 'num' from active to deleted. O(lg n) amortized, O(n) worst case time.
        num must be an active element in our data structure. May trigger a rebuild."""
        if num >= self.min_heap[0]:
            if num == self.min_heap[0]:
                heapq.heappop(self.min_heap)
            else:
                self.deleted[num] += 1
            self.min_real_elems -= 1
        else:
            if num == -self.max_heap[0]:
                heapq.heappop(self.max_heap)
            else:
                self.deleted[num] += 1
            self.max_real_elems -= 1

        self.total_real_elems -= 1
        self._rebalance()

    def _clean_min_heap_front(self) -> None:
        """While the front of the min_heap was already deleted, remove it from the min_heap"""
        while self.deleted[self.min_heap[0]] > 0:
            self.deleted[heapq.heappop(self.min_heap)] -= 1

    def _clean_max_heap_front(self) -> None:
        """While the front of the max_heap was already deleted, remove it from the max_heap"""
        while self.deleted[-self.max_heap[0]] > 0:
            self.deleted[-heapq.heappop(self.max_heap)] -= 1

    def _rebuild_fully(self) -> None:
        """To guarantee O(log n) amortized insertions and deletions with lazy deletions, we must detect when the number
        of removed elements still in our heap has grown too large: If so, perform an O(n) full rebuild of both heaps
        from scratch, clearing all previously removed elements."""

        # Rebuild heaps, trying to maintain size approximately based on the median
        approx_median: int = self.min_heap[0]

        new_min_heap: List[Union[int, float]] = [math.inf]
        new_max_heap: List[Union[int, float]] = [math.inf]

        for elem in self.max_heap:
            if self.deleted[-elem] > 0:
                self.deleted[-elem] -= 1
                continue
            elif math.isinf(elem):
                continue
            if -elem < approx_median:
                new_max_heap.append(elem)
            elif -elem > approx_median:
                new_min_heap.append(-elem)
            else:
                if len(new_min_heap) - len(new_max_heap) > 1:
                    new_max_heap.append(elem)
                else:
                    new_min_heap.append(-elem)

        for elem in self.min_heap:
            if self.deleted[elem] > 0:
                self.deleted[elem] -= 1
                continue
            elif math.isinf(elem):
                continue

            if elem < approx_median:
                new_max_heap.append(-elem)
            elif elem > approx_median:
                new_min_heap.append(elem)
            else:
                if len(new_min_heap) - len(new_max_heap) > 1:
                    new_max_heap.append(-elem)
                else:
                    new_min_heap.append(elem)

        self.min_heap = new_min_heap
        self.max_heap = new_max_heap

        heapq.heapify(self.min_heap)
        heapq.heapify(self.max_heap)

        self.deleted.clear()
        self.min_real_elems = len(self.min_heap) - 1
        self.max_real_elems = len(self.max_heap) - 1
        self.total_real_elems = self.min_real_elems + self.max_real_elems

        if not (0 <= (self.min_real_elems - self.max_real_elems) <= 1):
            self._rebalance()

    def _need_full_rebuild_check(self) -> bool:
        """Test whether our heaps have a larger fraction of removed elements than allowed"""
        total_size: int = len(self.min_heap) + len(self.max_heap)
        return (self.lazy_deletion_multiplier * (total_size - self.total_real_elems)
               > total_size + self.lazy_deletion_constant)

    def _rebalance(self):
        """ Restore the class invariants:
        1. Front of each heap is infinity (empty heap or sentinel), or a valid element
        2. All elements in self.max_heap have a value <= all elements in self.min_heap
        3. Size (i.e. undeleted elements) of self.min_heap - size of self.max_heap is 0 or 1"""

        if self._need_full_rebuild_check():
            self._rebuild_fully()
            return None

        self._clean_min_heap_front()
        self._clean_max_heap_front()

        while -self.max_heap[0] > self.min_heap[0]:
            if self.min_real_elems - self.max_real_elems <= -1:  # Prefer deleting from max_heap
                self.max_real_elems -= 1
                self.min_real_elems += 1
                heapq.heappush(self.min_heap, -heapq.heappop(self.max_heap))
                self._clean_max_heap_front()

            else:  # Prefer deleting from min_heap
                self.max_real_elems += 1
                self.min_real_elems -= 1
                heapq.heappush(self.max_heap, -heapq.heappop(self.min_heap))
                self._clean_min_heap_front()

        while self.min_real_elems - self.max_real_elems <= -1:  # Need to reduce size of max_heap
            self.max_real_elems -= 1
            self.min_real_elems += 1
            heapq.heappush(self.min_heap, -heapq.heappop(self.max_heap))
            self._clean_max_heap_front()  # Removing front of a heap may place a deleted element in front

        while self.min_real_elems - self.max_real_elems > 1:  # Need to reduce size of min_heap
            self.max_real_elems += 1
            self.min_real_elems -= 1
            heapq.heappush(self.max_heap, -heapq.heappop(self.min_heap))
            self._clean_min_heap_front()  # Removing front of min_heap may place a deleted element in front

        return None

    def calculate_median(self) -> float:
        """Calculate the median in constant time: the median element(s) are always in a heap's front."""
        if self.total_real_elems == 0:
            raise IndexError

        if self.total_real_elems % 2 == 0:
            return (self.min_heap[0] - self.max_heap[0]) / 2.0
        else:
            return self.min_heap[0]

【讨论】:

  • 我考虑过SortedList,但他们写的是"O(log(n)) – approximate",所以显然不是吗?
  • @don'ttalkjustcode 没错;查看文档,底层实现是限制为 2 级的 B-Tree,因此理论上最佳复杂度为 O(n^1/3)。 performance comparisons 显然表明它比其他公开可用的 Python 的“排序映射”模块更快,因此它可能是纯 Python 中运行中位数问题的最实用解决方案。
猜你喜欢
  • 2017-08-13
  • 1970-01-01
  • 1970-01-01
  • 2011-07-09
  • 2023-03-23
  • 1970-01-01
  • 1970-01-01
  • 2019-09-29
  • 1970-01-01
相关资源
最近更新 更多