【问题标题】:Split a generator into chunks without pre-walking it将生成器拆分成块而不预先遍历它
【发布时间】:2016-04-28 06:37:57
【问题描述】:

(这个问题与this onethis one有关,但这些都是预先遍历生成器,这正是我想要避免的)

我想将生成器拆分成块。要求是:

  • 不要填充块:如果剩余元素的数量小于块大小,则最后一个块必须更小。
  • 不要事先遍历生成器:计算元素的成本很高,而且只能由消费函数完成,不能由分块器完成
  • 这当然意味着:不要在内存中累积(没有列表)

我已经尝试了以下代码:

def head(iterable, max=10):
    for cnt, el in enumerate(iterable):
        yield el
        if cnt >= max:
            break

def chunks(iterable, size=10):
    i = iter(iterable)
    while True:
        yield head(i, size)

# Sample generator: the real data is much more complex, and expensive to compute
els = xrange(7)

for n, chunk in enumerate(chunks(els, 3)):
    for el in chunk:
        print 'Chunk %3d, value %d' % (n, el)

这在某种程度上有效:

Chunk   0, value 0
Chunk   0, value 1
Chunk   0, value 2
Chunk   1, value 3
Chunk   1, value 4
Chunk   1, value 5
Chunk   2, value 6
^CTraceback (most recent call last):
  File "xxxx.py", line 15, in <module>
    for el in chunk:
  File "xxxx.py", line 2, in head
    for cnt, el in enumerate(iterable):
KeyboardInterrupt

Buuuut ...因为while True,它永远不会停止(我必须按^C)。每当生成器被消耗时,我想停止该循环,但我不知道如何检测这种情况。我已经尝试提出异常:

class NoMoreData(Exception):
    pass

def head(iterable, max=10):
    for cnt, el in enumerate(iterable):
        yield el
        if cnt >= max:
            break
    if cnt == 0 : raise NoMoreData()

def chunks(iterable, size=10):
    i = iter(iterable)
    while True:
        try:
            yield head(i, size)
        except NoMoreData:
            break

# Sample generator: the real data is much more complex, and expensive to compute    
els = xrange(7)

for n, chunk in enumerate(chunks(els, 2)):
    for el in chunk:
        print 'Chunk %3d, value %d' % (n, el)

然后异常只在消费者的上下文中引发,这不是我想要的(我想保持消费者代码干净)

Chunk   0, value 0
Chunk   0, value 1
Chunk   0, value 2
Chunk   1, value 3
Chunk   1, value 4
Chunk   1, value 5
Chunk   2, value 6
Traceback (most recent call last):
  File "xxxx.py", line 22, in <module>
    for el in chunk:
  File "xxxx.py", line 9, in head
    if cnt == 0 : raise NoMoreData
__main__.NoMoreData()

如何在 chunks 函数中检测到生成器已耗尽,而无需遍历它?

【问题讨论】:

  • 不知道如何解决它,但 except 只会在 创建 head 时引发异常,而不是在迭代它时捕获异常。
  • @tobias_k:当然,我明白这一点。我正在寻找解决方案...
  • 可以偷看第一个元素吗?您可以尝试 next 第一个元素,然后引发异常或返回实际的块迭代器。
  • @tobias_k:这将是一个很好的折衷方案,但不确定如何在不丢失该元素的情况下实现它...
  • 你能解释一下“预先运行发电机”是什么意思吗?

标签: python generator


【解决方案1】:

一种方法是查看第一个元素(如果有),然后创建并返回实际的生成器。

def head(iterable, max=10):
    first = next(iterable)      # raise exception when depleted
    def head_inner():
        yield first             # yield the extracted first element
        for cnt, el in enumerate(iterable):
            yield el
            if cnt + 1 >= max:  # cnt + 1 to include first
                break
    return head_inner()

只需在您的 chunk 生成器中使用它,并像使用自定义异常一样捕获 StopIteration 异常。


更新:这是另一个版本,使用itertools.islice 替换大部分head 函数和for 循环。这个简单的for 循环实际上与原始代码中笨重的while-try-next-except-break 构造完全相同,因此结果更具可读性

def chunks(iterable, size=10):
    iterator = iter(iterable)
    for first in iterator:    # stops when iterator is depleted
        def chunk():          # construct generator for next chunk
            yield first       # yield element from for loop
            for more in islice(iterator, size - 1):
                yield more    # yield more elements from the iterator
        yield chunk()         # in outer generator, yield next chunk

我们可以得到比这更短的,使用itertools.chain 替换内部生成器:

def chunks(iterable, size=10):
    iterator = iter(iterable)
    for first in iterator:
        yield chain([first], islice(iterator, size - 1))

【讨论】:

  • 行得通!谢谢!我有完整的例子:如果你愿意,我可以编辑你的答案以包含它并作为参考。
  • 使用for n, chunk in enumerate(chunks(xrange(0,100), 10)): 的初始示例,这似乎对我不起作用。如果我在那个 for 循环中 print(n),我最终会得到 100 个块。我在这里误解了什么吗?
  • @kadrach 那是因为您在生成下一个之前没有消耗这些块。只有在生成下一个块之前消耗了每个块,该代码才有效;否则,块 n 应该如何知道它的第一个元素是什么,而不实际将迭代器推进到块 n-1 的末尾?
  • 这只是救了我的命
  • 我认为表现得像itertools.groupby 会很好,即不要仅仅因为它们没有被消耗而改变块/组。 Possible implementation.
【解决方案2】:

另一种创建组/块而不是prewalk生成器的方法是在使用itertools.count 对象的键函数上使用itertools.groupby。由于count 对象独立于 iterable,因此无需了解 iterable 的内容即可轻松生成块。

groupby 的每次迭代都会调用count 对象的next 方法,并通过对当前计数值按块的大小。

from itertools import groupby, count

def chunks(iterable, size=10):
    c = count()
    for _, g in groupby(iterable, lambda _: next(c)//size):
        yield g

每个组/块 g yielded 生成器函数是一个迭代器。但是,由于groupby 对所有组使用共享迭代器,因此组迭代器不能存储在列表或任何容器中,每个组迭代器都应该在下一个之前使用。

【讨论】:

  • 这比上面 tobias_k 发布的 itertools.chain 方法慢 7 到 8 倍
  • 此方法有效。 Tobias_k 的方法不会产生正确的结果。为 0 到 21 的整数创建一个生成器,批量大小为 10。所写的 Tobias_k 方法不会产生三个组。这个方法可以。我不在乎它很慢,我在乎它是否有效。
  • 改进:将c = count()next(c)//size 替换为c = cycle((False,) * size + (True,) * size)partial(next, c)(将导入更改为cycle 而不是count 并添加from functools import partial)。避免按元素划分和按元素创建新的 int 对象(在 tuple 的大小为 2 * size 的常量 cycle 迭代上的开销很小),运行时节省了 35% 以上。使用map(operator.itemgetter(1), ...) 将完全删除每个元素的 Python 代码,但在 3.9 上测试,收益不到 5%;不值得麻烦。
【解决方案3】:

更小n 的更快解决方案(自 2021 年 12 月 2 日起新增):

当块大小通常较小时,最快的解决方案是这个,改编自rhettg's answer

from itertools import takewhile, zip_longest

def chunker(n, iterable):
    '''chunker(3, 'ABCDEFG') --> ('A', 'B', 'C'), ('D', 'E', 'F'),  ('G',)'''
    args = (iter(iterable),) * n
    for x in zip_longest(*args, fillvalue=fillvalue):
        if x[-1] is fillvalue:
            # takewhile optimizes a bit for when n is large and the final
            # group is small; at the cost of a little performance, you can
            # avoid the takewhile import and simplify to:
            # yield tuple(v for v in x if v is not fillvalue)
            yield tuple(takewhile(lambda v: v is not fillvalue, x))
        else:
            yield x

旧答案(仍然很快,但基本上在所有情况下都比上面略逊一筹,在常见情况下大约是 2 倍):

由于(在 CPython 中)使用纯 C 级内置函数,我可以想出最快的解决方案。通过这样做,生成每个块不需要 Python 字节码(除非底层生成器是用 Python 实现的),这具有巨大的性能优势。它会在返回之前遍历每个 chunk,但它不会在即将返回的块之外进行任何预遍历:

# Py2 only to get generator based map
from future_builtins import map

from itertools import islice, repeat, starmap, takewhile
# operator.truth is *significantly* faster than bool for the case of
# exactly one positional argument prior to 3.10; in 3.10+, you can
# just use bool (which is trivially faster than truth)
from operator import truth

def chunker(n, iterable):  # n is size of each chunk; last chunk may be smaller
    return takewhile(truth, map(tuple, starmap(islice, repeat((iter(iterable), n)))))

因为有点密集,展开版来说明:

def chunker(n, iterable):
    iterable = iter(iterable)
    while True:
        x = tuple(islice(iterable, n))
        if not x:
            return
        yield x

enumerate 中封装对chunker 的调用可以让您在需要时为块编号。

【讨论】:

    【解决方案4】:

    more-itertools已经提供了可以达到目的的chunkedichunked,在Python 3 itertools document page上有提到。

    【讨论】:

      【解决方案5】:

      itertools.islice怎么样:

      import itertools
      
      els = iter(xrange(7))
      
      print list(itertools.islice(els, 2))
      print list(itertools.islice(els, 2))
      print list(itertools.islice(els, 2))
      print list(itertools.islice(els, 2))
      

      这给出了:

      [0, 1]
      [2, 3]
      [4, 5]
      [6]
      

      【讨论】:

      • 谢谢,但不完整。这里的分块器在哪里?
      • 我也考虑过islice,但是您需要一些机制来确定切片何时为空并停止。
      【解决方案6】:

      在制定以更高速度插入 500k+ 行的数据库的解决方案时,开始意识到此方案的有用性。

      生成器处理来自源的数据并逐行“生成”它;然后另一个生成器将输出分组并逐块“输出”。第二个生成器只知道块大小,仅此而已。

      下面是一个突出概念的示例:

      #!/usr/bin/python
      
      def firstn_gen(n):
          num = 0
          while num < n:
              yield num
              num += 1
      
      def chunk_gen(some_gen, chunk_size=7):
          res_chunk = []
          for count, item in enumerate(some_gen, 1):
              res_chunk.append(item)
              if count % chunk_size == 0:
                  yield res_chunk
                  res_chunk[:] = []
          else:
              yield res_chunk
      
      
      if __name__ == '__main__':
          for a_chunk in chunk_gen(firstn_gen(33)):
              print(a_chunk)
      

      在 Python 2.7.12 中测试:

      [0, 1, 2, 3, 4, 5, 6]
      [7, 8, 9, 10, 11, 12, 13]
      [14, 15, 16, 17, 18, 19, 20]
      [21, 22, 23, 24, 25, 26, 27]
      [28, 29, 30, 31, 32]
      

      【讨论】:

        【解决方案7】:

        我遇到了同样的问题,但找到了比这里提到的更简单的解决方案:

        def chunker(iterable, chunk_size):
            els = iter(iterable)
            while True:
                next_el = next(els)
                yield chain([next_el], islice(els, chunk_size - 1))
        
        for i, chunk in enumerate(chunker(range(11), 2)):
            for el in chunk:
                print(i, el)
        
        # Prints the following:
        0 0
        0 1
        1 2
        1 3
        2 4
        2 5
        3 6
        3 7
        4 8
        4 9
        5 10
        

        【讨论】:

          【解决方案8】:
          from itertools import islice
          def chunk(it, n):
              '''
              # returns chunks of n elements each
          
              >>> list(chunk(range(10), 3))
              [
                  [0, 1, 2, ],
                  [3, 4, 5, ],
                  [6, 7, 8, ],
                  [9, ]
              ]
          
              >>> list(chunk(list(range(10)), 3))
              [
                  [0, 1, 2, ],
                  [3, 4, 5, ],
                  [6, 7, 8, ],
                  [9, ]
              ]
              '''
              def _w(g):
                  return lambda: tuple(islice(g, n))
              return iter(_w(iter(it)), ())
          

          【讨论】:

            【解决方案9】:

            Moses Koledoye's answer 的启发,我尝试制作一个使用itertools.groupby 但不需要在每个步骤中划分的解决方案。

            以下函数可以用作 groupby 的键,它只是返回一个布尔值,在预定义的调用次数后翻转。

            def chunks(chunksize=3):
            
                def flag_gen():
                    flag = False
                    while True:
                        for num in range(chunksize):
                            yield flag
                        flag = not flag
            
                flag_iter = flag_gen()
            
                def flag_func(*args, **kwargs):
                    return next(flag_iter)
            
                return flag_func
            

            可以这样使用:

            from itertools import groupby
            
            my_long_generator = iter("abcdefghijklmnopqrstuvwxyz")
            
            chunked_generator = groupby(my_long_generator, key=chunks(chunksize=5))
            
            for flag, chunk in chunked_generator:
                print("Flag is {f}".format(f=flag), list(chunk))
            

            输出:

            Flag is False ['a', 'b', 'c', 'd', 'e']
            Flag is True ['f', 'g', 'h', 'i', 'j']
            Flag is False ['k', 'l', 'm', 'n', 'o']
            Flag is True ['p', 'q', 'r', 's', 't']
            Flag is False ['u', 'v', 'w', 'x', 'y']
            Flag is True ['z']
            

            我创建了一个fiddle demonstrating this code

            【讨论】:

              【解决方案10】:

              你说你不希望在内存中存储东西,那么这是否意味着你不能为当前块构建中间列表?

              为什么不遍历生成器并在块之间插入一个哨兵值?消费者(或合适的包装器)可以忽略哨兵:

              class Sentinel(object):
                  pass
              
              def chunk(els, size):
                  for i, el in enumerate(els):
                      yield el
                      if i > 0 and i % size == 0:
                          yield Sentinel
              

              【讨论】:

              • 我不想遍历生成器。我想生成一个生成器列表,只有在消费者决定时才会遍历原始生成器。这样,我可以传递块列表供我的代码的其他部分使用它,而无需预先遍历数据(在我的情况下计算成本很高)
              【解决方案11】:

              使用生成器生成器编辑其他解决方案

              你不应该在你的迭代器中使用while True,而只是简单地遍历它并在每次迭代时更新块号:

              def chunk(it, maxv):
                  n = 0
                  for i in it:
                      yield n // mavx, i
                      n += 1
              

              如果你想要一个生成器的生成器,你可以拥有:

              def chunk(a, maxv):
                  def inner(it, maxv, l):
                      l[0] = False
                      for i in range(maxv):
                          yield next(it)
                      l[0] = True
                      raise StopIteration
                  it = iter(a)
                  l = [True]
                  while l[0] == True:
                      yield inner(it, maxv, l)
                  raise StopIteration
              

              a 是可迭代的。

              测试:在 python 2.7 和 3.4 上:

              for i in chunk(range(7), 3):
                  print 'CHUNK'
                  for a in i:
                      print a
              

              给:

              CHUNK
              0
              1
              2
              CHUNK
              3
              4
              5
              CHUNK
              6
              

              在 2.7 上:

              for i in chunk(xrange(7), 3):
                  print 'CHUNK'
                  for a in i:
                      print a
              

              给出相同的结果。

              但是注意list(chunk(range(7)) 会在 2.7 和 3.4 上阻塞

              【讨论】:

              • 这并没有为我提供一系列块,而是为我提供了一个带有它所属块的指示的迭代器。它预先遍历迭代器,这正是我想要避免的。
              猜你喜欢
              • 1970-01-01
              • 1970-01-01
              • 2016-08-22
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2016-11-05
              • 2012-01-19
              相关资源
              最近更新 更多