【问题标题】:How can you feed an iterable to multiple consumers in constant space?如何在恒定空间中向多个消费者提供迭代?
【发布时间】:2020-04-08 10:49:09
【问题描述】:

如何在恒定空间中向多个消费者提供可迭代对象?

TLDR

编写一个在 CONSTANT SPACE 中通过以下测试的实现,同时 将minmaxsum 视为黑盒子。

def testit(implementation, N):
    assert implementation(range(N), min, max, sum) == (0, N-1, N*(N-1)//2)

讨论

我们喜欢迭代器,因为它们让我们可以懒惰地处理数据流, 允许在 CONSTANT SPACE 中处理大量数据。

def source_summary(source, summary):
    return summary(source)

N = 10 ** 8
print(source_summary(range(N), min))
print(source_summary(range(N), max))
print(source_summary(range(N), sum))

每行都需要几秒钟的时间来执行,但使用的内存很少。然而, 它确实需要对源进行 3 次单独的遍历。因此,如果 您的来源是网络连接、数据采集硬件等,除非您将所有数据缓存在某处,从而失去 CONSTANT SPACE 要求。

这是一个演示这个问题的版本

def source_summaries(source, *summaries):
    from itertools import tee
    return tuple(map(source_summary, tee(source, len(summaries)),
                                     summaries))

testit(source_summaries, N)
print('OK')

测试通过,但tee 必须保留所有数据的副本,因此空间使用量从O(1) 上升到O(N)

如何在内存不变的情况下单次遍历得到结果?

当然,有可能通过顶部给出的测试,使用O(1) 空间,通过作弊: 使用测试使用的特定迭代器消费者的知识。但 这不是重点:source_summaries 应该与 any 迭代器一起使用 setcollections.Counter''.join 等消耗品,包括任何 以及将来可能会写的所有内容。实施必须对待他们 作为黑盒子。

明确一点:关于消费者的唯一知识是每个消费者消费一个迭代并返回一个结果。使用有关消费者的任何其他信息都是作弊。

想法

[编辑:我已经发布了这个想法的实现作为答案]

我可以想象一个使用的解决方案(我真的不喜欢)

  • 抢占式线程

  • 将消费者链接到源的自定义迭代器

让我们调用自定义迭代器link

  • 对于每个消费者,运行
result = consumer(<link instance for this thread>)
<link instance for this thread>.set_result(result)

在单独的线程上。

  • 在主线程上,类似于
for item in source:
    for l in links:
        l.push(item)

for l in links:
    l.stop()

for thread in threads:
    thread.join()

return tuple(link.get_result, links)
  • link.__next__ 阻塞,直到 link 实例接收到

    • .push(item) 在这种情况下它会返回项目
    • .stop() 在这种情况下会引发 StopIteration
  • 数据竞赛看起来就像一场噩梦。您需要一个推送队列,并且可能需要将一个哨兵对象放入队列中 link.stop() ... 以及我忽略的其他一些事情。

我更喜欢使用协作线程,但consumer(link) 似乎是 难免不合作。

你有什么不那么凌乱的建议吗?

【问题讨论】:

  • 这些功能必须是怎样的“黑匣子”?会像在reduce 调用中那样计算中间结果吗?这样,您可以初始化tmp = 0,然后在每次迭代中执行tmp = sum(tmp, current_value),而不是计算sum(some_list)。您可以同时对所有三个操作(minmaxsum)执行此操作,并且只需要遍历元素一次。唯一的问题是为三个操作中的每一个选择一个有意义的初始值 tmp
  • @DanielJunglas 完全黑匣子。在等效的二进制函数上使用reduce,需要特定于消费者的知识。因此,它属于我在问题中提到的“作弊”。我想提供(类似的)这个作为一个库实用程序,用户可以使用他们想要的任何消费者调用它,包括今天尚未发明的消费者,所以我可以唯一了解消费者是它消费一个迭代来产生结果。 除此之外的任何事情都是作弊。
  • 您这样做是为了练习还是为了真实世界的图书馆?在后一种情况下,我认为扩展您的函数以便将 tmp 的初始化程序作为参数是有意义的。如果您查看内置的 sum() 函数,那么这正是该函数的作用。这就是您如何使用该函数来总结数字或连接具有相同实现的列表。无论如何,这些只是我的两分钱。
  • @DanielJunglas 可迭代对象的任意消费者的接口是consumer(iterable)没有别的summax 或您心目中的任何其他特定提供更多都没关系,该库只能依赖最低公分母。这在练习和现实世界中都成立:它是“界面”含义的基本属性!
  • @DanielJunglas 我想我可能明白你的一些困惑来自哪里:也许你认为比 所有 消费者都像maxmin 一样超载:max((1,2)) == max(1,2) [注意:第二种情况下的括号更少]。 max 在这方面是不寻常的:即使sum(您在第一个示例中使用)也不能以这种方式使用:sum((1,2)) == 3sum(1,2) 是一个错误。所以你的tmp = sum(tmp, current_value) 的例子也是一个错误。绝大多数都属于这一类:listtuplesetdictcollections.Counterenumeratepartial(map, fn)等。

标签: python generator lazy-sequences


【解决方案1】:

这是您想法的另一种实现方式。它使用协作多线程。正如您所建议的,关键是使用多线程并让迭代器 __next__ 方法阻塞,直到所有线程都消耗完当前迭代。

此外,迭代器包含一个(可选的)恒定大小的缓冲区。有了这个缓冲区,我们可以分块读取源代码并避免大量锁定/同步。

我的实现还处理了一些消费者在到达迭代器末尾之前停止迭代的情况。

import threading

class BufferedMultiIter:
    def __init__(self, source, n, bufsize = 1):
        '''`source` is an iterator or iterable,
        `n` is the number of threads that will interact with this iterator,
        `bufsize` is the size of the internal buffer. The iterator will read
        and buffer elements from `source` in chunks of `bufsize`. The bigger
        the buffer is, the better the performance but also the bigger the
        (constant) space requirement.
        '''
        self._source = iter(source)
        self._n = n
        # Condition variable for synchronization
        self._cond = threading.Condition()
        # Buffered values
        bufsize = max(bufsize, 1)
        self._buffer = [None] * bufsize
        self._buffered = 0
        self._next = threading.local()
        # State variables to implement the "wait for buffer to get refilled"
        # protocol
        self._serial = 0
        self._waiting = 0

        # True if we reached the end of the source
        self._stop = False
        # Was the thread killed (for error handling)?
        self._killed = False

    def _fill_buffer(self):
        '''Refill the internal buffer.'''
        self._buffered = 0
        while self._buffered < len(self._buffer):
            try:
                self._buffer[self._buffered] = next(self._source)
                self._buffered += 1
            except StopIteration:
                self._stop = True
                break
            # Explicitly clear the unused part of the buffer to release
            # references as early as possible
            for i in range(self._buffered, len(self._buffer)):
                self._buffer[i] = None
        self._waiting = 0
        self._serial += 1

    def register_thread(self):
        '''Register a thread.

        Each thread that wants to access this iterator must first register
        with the iterator. It is an error to register the same thread more
        than once. It is an error to access this iterator with a thread that
        was not registered (with the exception of calling `kill`). It is an
        error to register more threads than the number that was passed to the
        constructor.
        '''
        self._next.i = 0

    def unregister_thread(self):
        '''Unregister a thread from this iterator.

        This should be called when a thread is done using the iterator.
        It catches the case in which a consumer does not consume all the
        elements from the iterator but exits early.
        '''
        assert hasattr(self._next, 'i')
        delattr(self._next, 'i')
        with self._cond:
            assert self._n > 0
            self._n -= 1
            if self._waiting == self._n:
                self._fill_buffer()
            self._cond.notify_all()

    def kill(self):
        '''Forcibly kill this iterator.

        This will wake up all threads currently blocked in `__next__` and
        will have them raise a `StopIteration`.
        This function should be called in case of error to terminate all
        threads as fast as possible.
        '''
        self._cond.acquire()
        self._killed = True
        self._stop = True
        self._cond.notify_all()
        self._cond.release()
    def __iter__(self): return self
    def __next__(self):
        if self._next.i == self._buffered:
            # We read everything from the buffer.
            # Wait until all other threads have also consumed the buffer
            # completely and then refill it.
            with self._cond:
                old = self._serial
                self._waiting += 1
                if self._waiting == self._n:
                    self._fill_buffer()
                    self._cond.notify_all()
                else:
                    # Wait until the serial number changes. A change in
                    # serial number indicates that another thread has filled
                    # the buffer
                    while self._serial == old and not self._killed:
                        self._cond.wait()
            # Start at beginning of newly filled buffer
            self._next.i = 0

        if self._killed:
            raise StopIteration
        k = self._next.i
        if k == self._buffered and self._stop:
            raise StopIteration
        value = self._buffer[k]
        self._next.i = k + 1
        return value

class NotAll:
    '''A consumer that does not consume all the elements from the source.'''
    def __init__(self, limit):
        self._limit = limit
        self._consumed = 0
    def __call__(self, it):
        last = None
        for k in it:
            last = k
            self._consumed += 1
            if self._consumed >= self._limit:
                break
        return last

def multi_iter(iterable, *consumers, **kwargs):
    '''Iterate using multiple consumers.

    Each value in `iterable` is presented to each of the `consumers`.
    The function returns a tuple with the results of all `consumers`.

    There is an optional `bufsize` argument. This controls the internal
    buffer size. The bigger the buffer, the better the performance, but also
    the bigger the (constant) space requirement of the operation.

    NOTE: This will spawn a new thread for each consumer! The iteration is
    multi-threaded and happens in parallel for each element.
    '''
    n = len(consumers)
    it = BufferedMultiIter(iterable, n, kwargs.get('bufsize', 1))
    threads = list() # List with **running** threads
    result = [None] * n
    def thread_func(i, c):
        it.register_thread()
        result[i] = c(it)
        it.unregister_thread()
    try:
        for c in consumers:
            t = threading.Thread(target = thread_func, args = (len(threads), c))
            t.start()
            threads.append(t)
    except:
        # Here we should forcibly kill all the threads but there is not
        # t.kill() function or similar. So the best we can do is stop the
        # iterator
        it.kill()
    finally:
        while len(threads) > 0:
            t = threads.pop(-1)
            t.join()
    return tuple(result)

from time import time
N = 10 ** 7
notall1 = NotAll(1)
notall1000 = NotAll(1000)
start1 = time()
res1 = (min(range(N)), max(range(N)), sum(range(N)), NotAll(1)(range(N)),
        NotAll(1000)(range(N)))
stop1 = time()
print('5 iterators: %s %.2f' % (str(res1), stop1 - start1))

for p in range(5):
    start2 = time()
    res2 = multi_iter(range(N), min, max, sum, NotAll(1), NotAll(1000),
                      bufsize = 2**p)
    stop2 = time()
    print('multi_iter%d: %s %.2f' % (p, str(res2), stop2 - start2))

时间再糟糕不过了,但您可以看到使用恒定大小的缓冲区如何显着改善事情:

5 iterators: (0, 9999999, 49999995000000, 0, 999) 0.71
multi_iter0: (0, 9999999, 49999995000000, 0, 999) 342.36
multi_iter1: (0, 9999999, 49999995000000, 0, 999) 264.71
multi_iter2: (0, 9999999, 49999995000000, 0, 999) 151.06
multi_iter3: (0, 9999999, 49999995000000, 0, 999) 95.79
multi_iter4: (0, 9999999, 49999995000000, 0, 999) 72.79

也许这可以作为良好实施的想法来源。

【讨论】:

  • 谢谢!当您提到 cooperative 线程时,您让我非常兴奋……但我只能找到抢占式 :-( [您仍在使用 threading 模块:那是抢占式;我找不到任何 yield 或 asyncio 或任何其他看起来可能实现协作多任务的东西。] 我喜欢线程注销的想法来解决提前终止问题。感谢缓冲区计时,尽管我对它比我的慢得多感到惊讶。 .. 没有分析,我猜 CV 上的阻塞比队列中的要多。但我将把它留到另一天。
  • 一些测量结果:使用微不足道的源和消费者(rangeminmaxmaxsum)但实现稍微慢了一点,我的队列最多只有大约 6500 个元素每个,随着源长度的增长而渐近接近它。因此,对于您计时的最大缓冲区大小 4,可能会有 很多 的阻塞。如果我将sleep(0.001)(1 毫秒)每个项目注入源中,我的实现比普通迭代慢 1%,并且队列长度几乎从不超过 1; 0.1 毫秒延迟 -> 20% 减速,队列长度
  • 我想这取决于“合作”的确切定义。在我的代码中,一个线程始终运行,直到读取完整的缓冲区。此时,它在逻辑上被阻塞并自愿放弃 CPU(合作)。所以在“Python 层”它永远不会被抢占。无论如何,这不是你要找的。但是我注意到你的排队方法还有一点:如果你的线程比 CPU 多,并且有一个线程的优先级低于所有其他线程怎么办?该线程只有在所有其他线程都完成并且该线程的队列将缓冲整个序列后才会运行?
  • 哪里确切地是一个线程“自愿放弃CPU”?您是否将释放 CV 与放弃 CPU 混淆了? threading先发制人地从已锁定 CV 的线程下提取 CPU,其他需要该锁定的线程将不使用 CPU 时间,并且进度会暂停一段时间。我在这里看不到任何协作多任务处理,在 Python 中,这需要 yieldawaitthreading抢占式!)CPU 数量无关紧要:threading 仅使用 1 CPU(因为 GIL;参见multiprocessing)。 threading 中是否存在线程优先级?
  • 对不起,当我提到线程优先级时,我并不清楚我的意思。我的观点是:您的队列实现不满足“恒定空间”要求。线程的队列可能会变得任意大。在最坏的情况下,它必须缓冲输入序列中的所有元素。与线程优先级争论只是如何强制这种行为的一个例子。但是您实际上可以以更简单的方式强制执行此操作:在处理第一个元素之前创建一个执行 time.sleep(60) 的使用者。具有此使用者的线程的队列将缓冲完整的序列。
【解决方案2】:

这是原始问题中概述的抢占式线程解决方案的实现。

[编辑:这个实现有一个严重的问题。 [编辑,现已修复,使用受 Daniel Junglas 启发的解决方案。]

不遍历整个可迭代的消费者将导致Link内的队列中的空间泄漏。例如:


def exceeds_10(iterable):
    for item in iterable:
        if item > 10:
            return True
    return False

如果您将其用作消费者之一并使用源 range(10**6),它将在前 11 个项目之后停止从 Link 内的队列中删除项目,留下大约 10**6 项目在队列中累积!

]


class Link:

    def __init__(self, queue):
        self.queue = queue

    def __iter__(self):
        return self

    def __next__(self):
        item = self.queue.get()
        if item is FINISHED:
            raise StopIteration
        return item

    def put(self, item):
        self.queue.put(item)

    def stop(self):
        self.queue.put(FINISHED)

    def consumer_not_listening_any_more(self):
        self.__class__ = ClosedLink


class ClosedLink:

    def put(self, _): pass
    def stop(self)  : pass


class FINISHED: pass


def make_thread(link, consumer, future):
    from threading import Thread
    return Thread(target = lambda: on_thread(link, consumer, future))

def on_thread(link, consumer, future):
    future.set_result(consumer(link))
    link.consumer_not_listening_any_more()

def source_summaries_PREEMPTIVE_THREAD(source, *consumers):
    from queue     import SimpleQueue as Queue
    from asyncio   import Future

    links   = tuple(Link(Queue()) for _ in consumers)
    futures = tuple(     Future() for _ in consumers)
    threads = tuple(map(make_thread, links, consumers, futures))

    for thread in threads:
        thread.start()

    for item in source:
        for link in links:
            link.put(item)

    for link in links:
        link.stop()

    for t in threads:
        t.join()

    return tuple(f.result() for f in futures)

它可以工作,但(不出所料)性能会严重下降:

def time(thunk):
    from time import time
    start = time()
    thunk()
    stop  = time()
    return stop - start

N = 10 ** 7
t = time(lambda: testit(source_summaries, N))
print(f'old: {N} in {t:5.1f} s')

t = time(lambda: testit(source_summaries_PREEMPTIVE_THREAD, N))
print(f'new: {N} in {t:5.1f} s')

给予

old: 10000000 in   1.2 s
new: 10000000 in  30.1 s

因此,尽管这是一个理论上的解决方案,但它不是一个实际的解决方案[*]。

因此,我认为这种方法是死路一条,除非有办法说服consumer 合作让步(而不是强迫它先发制人地让步)

def on_thread(link, consumer, future):
    future.set_result(consumer(link))

...但这似乎根本不可能。希望被证明是错误的。

[*] 这实际上有点苛刻:测试对琐碎的数据绝对没有任何作用;如果这是对元素执行大量计算的大型计算的一部分,那么这种方法可能真的很有用。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-05-17
    相关资源
    最近更新 更多