【问题标题】:Creating a Python generator that yields ordered products of integers from two large lists创建一个 Python 生成器,从两个大列表中生成整数的有序乘积
【发布时间】:2019-01-07 01:58:00
【问题描述】:

所以,我有两个非常大的数字列表l1l2。我想将l1 的每个元素与l2 的每个元素相乘 明确地创建一个新的产品列表。因此,我想要一个发电机。这部分很容易。我可以做类似的事情

for a in l1:
    for b in l2:
        yield a * b

但是,我还需要按数量对这些产品进行排序。我想知道是否有一些巧妙的技巧可以对 yield 语句进行排序,以便也可以使用生成器来完成。如果可能的话,在 Python 3 中。谢谢。

【问题讨论】:

  • 您的原始列表是否已排序?
  • @DSM 是的,如果有帮助的话,可以毫无问题地生成它们。
  • @M.Haurer 我在这里可能完全错了,但在我看来你的要求似乎很奇怪。您正在寻求一种在不存储产品的情况下即时创建产品的方法,但同时您希望以某种方式订购它们。我真的不明白你怎么可能对你没有存储在一起的项目进行排序。
  • @scharette 这些要求是实用的,并且来自内存/计算限制。直观地说,如果原始列表是有序的,那么似乎可以从最小的条目开始,并以比整个列表短得多的某种程度的前瞻来移动。如果我让你从两个排序列表的元素中找出最小的乘积,你会很容易告诉我它一定是前两个元素的乘积。我只是想将此方法推广到第 n 个最小的元素。
  • 列表可以包含重复项吗?它们可以包含零值或负值吗?这两个列表的长度是否相同?

标签: python python-3.x iterator generator


【解决方案1】:

似乎没有其他方法可以在不创建列表的情况下对这些输出进行排序,因为如果不存储输出就无法排序。以下是你可以做到的。

myList = []

for i in range(len(l1)):
    for j in range(len(l2)):
        output = l1[i] * l2[j]
        myList.append(output)
myList.sort()
print(myList)

希望对您有所帮助。

【讨论】:

  • 首先,print(myList.sort()) 将打印 None
  • 很抱歉。请编辑帖子以提高其准确性,我会记住这一点以供将来回答。
  • 编辑者往往不愿意更改问题或答案中的代码,因为担心它可能会改变帖子的意图。原作者自己修复通常会更好:-)
  • 我明白了。我会尽快修复答案。
  • 是的,这使产品按顺序排列,但它实际上并没有回答问题,因为 OP 明确表示他们想要这样做“没有明确创建一个新的产品清单”。
【解决方案2】:

我将调用列表xsys,并假设它们已排序。正如您在评论中指出的那样,最小的产品必然是 xs[0] * ys[0] - 但前提是您还假设所有数字都是非负数,所以我也会假设。

在第一个产品之后,它变得更加混乱 - 否则你已经解决了它;-) 接下来要考虑的两个是 xs[0] * ys[1]xs[1] * ys[0]。很容易,但是接下来要考虑的下一个取决于哪些获胜。如果xs[0] * ys[1] 赢了,你只需要用xs[0] * ys[2] 替换它,但如果xs[1] * ys[0] 赢了,那么xs[1] * ys[1]xs[2] * ys[0] 都会发挥作用。以此类推。

以下内容跟踪堆中不断增长的可能性集。堆中的项目永远不会超过len(xs),因此代码首先安排使xs 成为更短的列表:

def upprod(xs, ys):
    # xs and ys must be sorted, and non-negative
    from heapq import heappush, heappop
    # make xs the shorter
    if len(ys) < len(xs):
        xs, ys = ys, xs
    if not xs:
        return
    lenxs = len(xs)
    lenys = len(ys)
    # the heap holds 4-tuples:
    #     (product, xs index, ys index, xs[xs index])
    h = [(xs[0] * ys[0], 0, 0, xs[0])]
    while h:
        prod, xi, yi, x = heappop(h)
        yield prod
        # same x with next y
        yi += 1
        if yi < lenys:
            heappush(h, (x * ys[yi], xi, yi, x))
        # if this is the first time we used x, start
        # the next x going
        if yi == 1:
            xi += 1
            if xi < lenxs:
                x = xs[xi]
                heappush(h, (x * ys[0], xi, 0, x))

如果存在本质上更有效的解决方案,我会感到非常惊喜。如果有人认为他们有,请先使用这个随机测试仪进行尝试:

from itertools import product
from random import randrange
MAXLEN = 10
UB = 1000
ntest = 0
while True:
    ntest += 1
    lenxs = randrange(MAXLEN + 1)
    lenys = randrange(MAXLEN + 1)
    xs = sorted(randrange(UB) for i in range(lenxs))
    ys = sorted(randrange(UB) for i in range(lenys))
    brute = sorted(a*b for a, b in product(xs, ys))
    got = list(upprod(xs, ys))
    if brute != got:
        print("OUCH!")
        print(xs)
        print(ys)
        print(brute)
        print(got)
        assert False
    if ntest % 10_000 == 0:
        print(f"finished test {ntest:,}")

编辑 - 从某种意义上说理论上更好;-)

以上内容并没有完全利用我们可以仅从索引中推断出的偏序:如果

i1 <= i2 and j1 <= j2

那么我们就知道了

xs[i1] * ys[j1] <= xs[i2] * ys[j2]

因为排序意味着xs[i1] &lt;= xs[i2]ys[j1] &lt;= ys[j2]

因此,例如,如果索引对 (0, 1)(1, 0) 在堆上,并且第二个获胜,则需要将 (2, 0) 添加到堆中,但 (1, 1) 不会:仅索引,我们就知道堆中剩余的(0, 1) 的乘积不大于(1, 1) 的乘积。只有当(0, 1) 也被删除时,(1, 1) 才需要添加。

一般来说,(i, 0) 形式的每一对都有一个直接前任 (i-1, 0)(0, j) 有一个 (0, j-1),而所有其他 (i, j) 有两个直接前任:(i-1, j) 和 @987654353 @。在所有前辈都从堆中取出之前,无需将一对放在堆上。

这导致这段代码看起来“更优雅”,因为更对称:

def upprod(xs, ys):
    # xs and ys must be sorted, and non-negative
    from heapq import heappush, heappop
    # make xs the shorter
    if len(ys) < len(xs):
        xs, ys = ys, xs
    if not xs:
        return
    lenxs = len(xs)
    lenys = len(ys)
    # the heap holds 3-tuples:
    #     (product, xs index, ys index)
    h = [(xs[0] * ys[0], 0, 0)]

    # interior points for which only one immediate predecessor has
    # been processed; there's no need to put them in the heap
    # until their second predecessor has been processed too
    pending = set()

    def add(xi, yi):
        if xi < lenxs and yi < lenys:
            if xi and yi: # if either is 0, only one predecessor
                p = xi, yi
                if p in pending:
                    pending.remove(p)
                else:
                    pending.add(p)
                    return
            heappush(h, (xs[xi] * ys[yi], xi, yi))

    while h:
        prod, xi, yi = heappop(h)
        yield prod
        # same x with next y; and same y with next x
        add(xi, yi + 1)
        add(xi + 1, yi)
    assert not pending

与第一个代码相比,它在许多情况下使堆更小。但是堆操作在堆条目数量上的时间是对数的,而且堆仍然可以增长到len(xs) 条目,所以这算不上什么胜利。它可能会因为两个新函数调用的开销而丢失(而内联那些太丑陋而无法承受)。

【讨论】:

  • 我没有更有效的解决方案,因为它在堆上放置的字节数大约是代码的 10 倍。但它确实设法在更大的列表上运行得更快一些。 ;)
【解决方案3】:

我的解决方案是创建一个生成器列表,乘积矩阵中的每一行一个生成器,然后使用heapq.merge 对这些生成器的输出进行排序。每个生成器在 32 位机器上的大小为 44 字节,因此整个生成器列表仅消耗少量 RAM。

heapq.merge(当没有提供排序键功能时)通过为您传递的每个可迭代对象创建一个 3 元组来工作。该元组包含可迭代的下一个值、可迭代的索引号以及对可迭代的__next__ 方法的引用。它将这些元组放在堆上以执行可迭代值的合并排序。你可以在其 Python source code 中查看详细信息。

因此,我的方法不像 Tim Peters 的解决方案那样节俭,但也不算太简陋,恕我直言。 ;)

def sorted_prod_merge(xs, ys):
    ''' mergesort generators of the rows. '''
    if len(ys) < len(xs):
        xs, ys = ys, xs
    def gen(x):
        for y in ys:
            yield x * y
    yield from merge(*[gen(x) for x in xs])

这里有一些timeit 代码,它显示了sorted_prod_merge、Tim Peters 的解决方案以及我的一些其他版本的运行时间。我使用了 Tim 的变量名来保持代码统一。有趣的是,Tim 的第一个版本的速度大约是他更高级的解决方案的两倍。我的 sorted_prod_row 运行速度很快,但它是一个可怕的 RAM 猪。

timeit 代码使用itertools recipes 中给出的技术来耗尽迭代器:我们将它提供给长度为零的双端队列。 time_test 代码对每次运行 Timer 的 3 个结果进行排序。这是因为 最小 结果是重要的结果,其他值只是表示测试运行时系统中的差异。有关详细信息,请参阅Timer.repeat 文档中的注释。

from heapq import heappush, heappop, merge
from random import seed, randrange
from timeit import Timer
from collections import deque

seed(163)

# Brute force method, as a generator
def sorted_prod_brute(xs, ys):
    yield from sorted(x * y for x in xs for y in ys)

# By Tim Peters
def upprod1(xs, ys):
    # xs and ys must be sorted, and non-negative
    from heapq import heappush, heappop
    # make xs the shorter
    if len(ys) < len(xs):
        xs, ys = ys, xs
    if not xs:
        return
    lenxs = len(xs)
    lenys = len(ys)
    # the heap holds 4-tuples:
    #     (product, xs index, ys index, xs[xs index])
    h = [(xs[0] * ys[0], 0, 0, xs[0])]
    while h:
        prod, xi, yi, x = heappop(h)
        yield prod
        # same x with next y
        yi += 1
        if yi < lenys:
            heappush(h, (x * ys[yi], xi, yi, x))
        # if this is the first time we used x, start
        # the next x going
        if yi == 1:
            xi += 1
            if xi < lenxs:
                x = xs[xi]
                heappush(h, (x * ys[0], xi, 0, x))

# By Tim Peters
def upprod2(xs, ys):
    # xs and ys must be sorted, and non-negative
    from heapq import heappush, heappop
    # make xs the shorter
    if len(ys) < len(xs):
        xs, ys = ys, xs
    if not xs:
        return
    lenxs = len(xs)
    lenys = len(ys)
    # the heap holds 3-tuples:
    #     (product, xs index, ys index)
    h = [(xs[0] * ys[0], 0, 0)]

    # interior points for which only one immediate predecessor has
    # been processed; there's no need to put them in the heap
    # until their second predecessor has been processed too
    pending = set()

    def add(xi, yi):
        if xi < lenxs and yi < lenys:
            doit = True
            if xi and yi: # if either is 0, only one predecessor
                p = xi, yi
                if p in pending:
                    pending.remove(p)
                else:
                    pending.add(p)
                    doit = False
            if doit:
                heappush(h, (xs[xi] * ys[yi], xi, yi))
    while h:
        prod, xi, yi = heappop(h)
        yield prod
        # same x with next y; and same y with next x
        add(xi, yi + 1)
        add(xi + 1, yi)
    assert not pending

def sorted_prod_merge(xs, ys):
    ''' mergesort generators of the rows. '''
    if len(ys) < len(xs):
        xs, ys = ys, xs
    def gen(x):
        for y in ys:
            yield x * y
    yield from merge(*[gen(x) for x in xs])

def sorted_prod_row(xs, ys):
    ''' Heapsort, row by row.
        Fast, but not space-efficient: the maximum 
        heap size grows to almost len(ys) * len(xs)
    '''
    if len(ys) < len(xs):
        xs, ys = ys, xs
    if not xs:
        return
    x, xs = xs[0], xs[1:]
    heap = []
    #big = 0
    for y in ys:
        lo = x * y
        while heap and heap[0] <= lo:
            yield heappop(heap)
        yield lo
        for u in xs:
            heappush(heap, u * y)
        #big = max(big, len(heap))
    #print(big)
    while heap:
        yield heappop(heap)

def sorted_prod_diag(xs, ys):
    ''' Heapsort, going along the diagonals
        50% slower than sorted_prod_row, but more
        space-efficient: the maximum heap size 
        grows to around 0.5 * len(ys) * len(xs)
    '''
    if not (xs and ys):
        return
    lenxs, lenys = len(xs), len(ys)
    heap = []
    #big = 0
    for n in range(lenxs + lenys - 1):
        row = sorted(xs[n - i] * ys[i]
            for i in range(max(0, n + 1 - lenxs), min(lenys, n + 1)))
        lo = row[0]
        while heap and heap[0] <= lo:
            yield heappop(heap)
        yield lo
        for u in row[1:]:
            heappush(heap, u)
        #big = max(big, len(heap))
    #print(big)
    #assert not heap

def sorted_prod_block(xs, ys):
    ''' yield the top left corner, then merge sort
        the top row, the left column and the remaining 
        block. So we end up with max(len(xs), len(ys))
        recursively nested calls to merge(). It's ok
        for small lists, but too slow otherwise.
    '''
    if not (xs and ys):
        return
    x, *xs = xs
    y, *ys = ys
    yield x * y
    row = (y * u for u in xs)
    col = (x * v for v in ys)
    yield from merge(row, col, sorted_prod_block(xs, ys))

def sorted_prod_blockI(xs, ys):
    ''' Similar to sorted_prod_block except we use indexing
        to avoid creating sliced copies of the lists
    '''
    lenxs, lenys = len(xs), len(ys)
    def sorted_block(xi, yi):
        if xi == lenxs or yi == lenys:
            return
        x, y = xs[xi], ys[yi]
        yield x * y
        xi, yi = xi + 1, yi + 1
        row = (xs[i] * y for i in range(xi, lenxs))
        col = (ys[i] * x for i in range(yi, lenys))
        yield from merge(row, col, sorted_block(xi, yi))
    yield from sorted_block(0, 0)

functions = (
    sorted_prod_brute,
    upprod1,
    upprod2,
    sorted_prod_merge,
    #sorted_prod_row,
    sorted_prod_diag,
    #sorted_prod_block,
    #sorted_prod_blockI,
)

UB = 1000

def verify(numtests, maxlen=10):
    print('Verifying. maxlen =', maxlen)
    for k in range(numtests):
        lenxs = randrange(maxlen + 1)
        lenys = randrange(maxlen + 1)
        print(k, ':', lenxs, '*', lenys, '=', lenxs * lenys)
        xs = sorted(randrange(UB) for i in range(lenxs))
        ys = sorted(randrange(UB) for i in range(lenys))
        good = list(sorted_prod_brute(xs, ys))

        for func in functions[1:]:
            result = list(func(xs, ys))
            if result != good:
                print(func.__name__, 'failed!')
    print()

def time_test(loops=20):
    timings = []
    for func in functions:
        # Consume the generator output by feeding it to a zero-length deque
        t = Timer(lambda: deque(func(xs, ys), maxlen=0))
        result = sorted(t.repeat(3, loops))
        timings.append((result, func.__name__))
    timings.sort()
    for result, name in timings:
        print('{:18} : {:.6f}, {:.6f}, {:.6f}'.format(name, *result))
    print()

verify(10, 10)
verify(20, 100)

print('\nTimings')
loops = 8192
minlen = 5
for k in range(6):
    lenxs = randrange(minlen, 2 * minlen)
    lenys = randrange(minlen, 2 * minlen)
    print(k, ':', loops, 'loops.', lenxs, '*', lenys, '=', lenxs * lenys)
    xs = sorted(randrange(UB) for i in range(lenxs))
    ys = sorted(randrange(UB) for i in range(lenys))
    time_test(loops)
    minlen *= 2
    loops //= 4

这是我古老的 2GHz 32 位单核机器上的输出,在 Linux 的旧 Debian 衍生发行版上运行 Python 3.6.0。 YMMV。

Verifying. maxlen = 10
0 : 8 * 9 = 72
1 : 9 * 0 = 0
2 : 1 * 7 = 7
3 : 8 * 10 = 80
4 : 10 * 5 = 50
5 : 10 * 0 = 0
6 : 5 * 2 = 10
7 : 5 * 10 = 50
8 : 3 * 0 = 0
9 : 0 * 6 = 0

Verifying. maxlen = 100
0 : 64 * 0 = 0
1 : 77 * 96 = 7392
2 : 24 * 13 = 312
3 : 53 * 39 = 2067
4 : 74 * 39 = 2886
5 : 92 * 97 = 8924
6 : 31 * 48 = 1488
7 : 39 * 17 = 663
8 : 42 * 25 = 1050
9 : 94 * 25 = 2350
10 : 82 * 83 = 6806
11 : 2 * 97 = 194
12 : 90 * 30 = 2700
13 : 93 * 24 = 2232
14 : 91 * 37 = 3367
15 : 24 * 86 = 2064
16 : 70 * 15 = 1050
17 : 2 * 4 = 8
18 : 72 * 58 = 4176
19 : 25 * 84 = 2100


Timings
0 : 8192 loops. 7 * 8 = 56
sorted_prod_brute  : 0.659312, 0.665853, 0.710947
upprod1            : 1.695471, 1.705061, 1.739299
sorted_prod_merge  : 1.990161, 1.991129, 2.001242
sorted_prod_diag   : 3.013945, 3.018927, 3.053115
upprod2            : 3.582396, 3.586332, 3.622949

1 : 2048 loops. 18 * 16 = 288
sorted_prod_brute  : 0.826128, 0.840111, 0.863559
upprod1            : 2.240931, 2.241636, 2.244615
sorted_prod_merge  : 2.301838, 2.304075, 2.306918
sorted_prod_diag   : 3.030672, 3.053302, 3.135322
upprod2            : 4.860378, 4.949804, 4.953891

2 : 512 loops. 39 * 32 = 1248
sorted_prod_brute  : 0.907932, 0.918692, 0.942830
sorted_prod_merge  : 2.559567, 2.561709, 2.604387
upprod1            : 2.700482, 2.701147, 2.757695
sorted_prod_diag   : 2.961776, 2.965271, 2.995747
upprod2            : 5.563303, 5.654425, 5.656695

3 : 128 loops. 68 * 70 = 4760
sorted_prod_brute  : 0.823448, 0.827748, 0.835049
sorted_prod_merge  : 2.591373, 2.592134, 2.685534
upprod1            : 2.760466, 2.763615, 2.795082
sorted_prod_diag   : 2.789673, 2.828662, 2.848498
upprod2            : 5.483504, 5.488450, 5.517847

4 : 32 loops. 122 * 156 = 19032
sorted_prod_brute  : 0.873736, 0.880958, 0.892846
sorted_prod_merge  : 2.701089, 2.742456, 2.818822
upprod1            : 2.875358, 2.881793, 2.922569
sorted_prod_diag   : 2.953450, 2.988184, 3.012430
upprod2            : 5.780552, 5.812967, 5.826775

5 : 8 loops. 173 * 309 = 53457
sorted_prod_brute  : 0.711012, 0.711816, 0.721627
sorted_prod_merge  : 1.997386, 1.999774, 2.033489
upprod1            : 2.137337, 2.172369, 3.335119
sorted_prod_diag   : 2.324447, 2.329552, 2.331095
upprod2            : 4.278704, 4.289019, 4.324436

【讨论】:

  • 生成器的大小不考虑暂停堆栈帧或ys 上的迭代器等成本。
  • sorted_prod_merge() 太明显了 ;-) 说真的,非常好!我的“高级”版本慢了一倍,主要是因为它创建了每个“内部”索引对两次,而第一个版本只创建了一次。您的合并获得了主要的速度优势,因为在内部 merge() 使用 heapreplace() 而不是我的代码的 heappop()+heappush() 对 - 这些堆操作是这些函数中最昂贵的“原语”,因此减少了打电话给他们是真正的帮助。
  • 顺便说一句,在sorted_prod_row 中执行heap.extend(u * y for u in xs); heap.sort() 应该快得多,特别是因为Python 的排序非常擅长对具有长升序运行的列表进行排序(u * y for u in xs 是单次升序运行)。并且排序列表一个堆,所以保留了堆属性。为了得到更多的奇闻趣事,bisect 可以用来查找下一个lo 拆分该排序列表的位置。并且函数的尾部可以变成yield from sorted(heap)。事实上,在该函数中使用堆可能是不明智的;-)
  • @user2357112 好点。而且我想不出任何办法来降低这些成本。
  • 谢谢,@TimPeters!是的,sorted_prod_merge() 很明显,但直到我尝试了其他变体之后我才想到尝试它。 :) 感谢关于sorted_prod_row 的建议。它的主要问题是它消耗了太多的内存,但我想我也可以将这个建议应用于sorted_prod_diag
猜你喜欢
  • 2018-11-16
  • 2015-01-16
  • 2021-04-11
  • 1970-01-01
  • 2013-04-08
  • 2022-11-17
  • 2016-07-09
  • 2016-07-10
  • 2023-03-09
相关资源
最近更新 更多