【问题标题】:Given N generators, is it possible to create a generator that runs them in parallel processes and yields the zip of those generators?给定 N 个生成器,是否可以创建一个在并行进程中运行它们并生成这些生成器的 zip 的生成器?
【发布时间】:2021-01-25 01:03:30
【问题描述】:

假设我有 N 个生成器 gen_1, ..., gen_N,其中每个生成器都会产生相同数量的值。我想要一个生成器gen,以便它在 N 个并行进程中运行 gen_1、...、gen_N 并产生 (next(gen_1), next(gen_2), ... next(gen_N))

那是我想要的:

def gen():
   yield (next(gen_1), next(gen_2), ... next(gen_N))

每个 gen_i 都在自己的进程上运行。是否有可能做到这一点?我尝试在以下虚拟示例中执行此操作,但没有成功:

A = range(4)

def gen(a):
    B = ['a', 'b', 'c']
    for b in B:
        yield b + str(a)

def target(g):
    return next(g)

processes = [Process(target=target, args=(gen(a),)) for a in A]

for p in processes:
    p.start()

for p in processes:
    p.join()

但是我收到错误TypeError: cannot pickle 'generator' object

编辑:

我已经修改了@darkonaut 的答案以满足我的需要。我发布它以防你们中的一些人觉得它有用。我们首先定义几个效用函数:

from itertools import zip_longest
from typing import List, Generator


def grouper(iterable, n, fillvalue=iter([])):
    "Collect data into fixed-length chunks or blocks"
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)

def split_generators_into_batches(generators: List[Generator], n_splits):
    chunks = grouper(generators, len(generators) // n_splits + 1)

    return [zip_longest(*chunk) for chunk in chunks]

以下类负责将任意数量的生成器拆分为 n(进程数)批次并处理它们以产生所需的结果:

import multiprocessing as mp

class GeneratorParallelProcessor:
SENTINEL = 'S'

def __init__(self, generators, n_processes = 2 * mp.cpu_count()):
    self.n_processes = n_processes
    self.generators = split_generators_into_batches(list(generators), n_processes)
    self.queue = mp.SimpleQueue()
    self.barrier = mp.Barrier(n_processes + 1)
    self.sentinels = [self.SENTINEL] * n_processes

    self.processes = [
        mp.Process(target=self._worker, args=(self.barrier, self.queue, gen)) for gen in self.generators
    ]

def process(self):
    for p in self.processes:
        p.start()

    while True:
        results = list(itertools.chain(*(self.queue.get() for _ in self.generators)))
        if results != self.sentinels:
            yield results
            self.barrier.wait()
        else:
            break

    for p in self.processes:
        p.join()

def _worker(self, barrier, queue, generator):
    for x in generator:
        queue.put(x)
        barrier.wait()
    queue.put(self.SENTINEL)

要使用它,只需执行以下操作:

parallel_processor = GeneratorParallelProcessor(generators)

    for grouped_generator in parallel_processor.process():
        output_handler(grouped_generator)

【问题讨论】:

  • 如果您已经拥有生成器对象,则没有通用的方法可以将它们移植到另一个进程中。您需要在每个 Process 开始时使用一个目标函数,该函数将在那里创建生成器。
  • 即使您设法做到这一点,GIL 也可能会阻止它们并行运行。
  • @MarkRansom 他使用的是multiprocessing 而不是线程,所以我认为 GIL 不适用于这里。
  • @thegamecracks 抱歉,我错过了;你是对的,它会从等式中删除 GIL。但它确实使数据交换更加棘手。

标签: python parallel-processing multiprocessing generator python-multiprocessing


【解决方案1】:

可以通过一些努力获得这样的“统一并行生成器(UPG)”(尝试创造一个名称),但正如@jasonharper 已经提到的,你肯定需要组装子子进程中的生成器,因为无法腌制正在运行的生成器。

下面的模式是可重复使用的,只有生成器函数gen() 是此示例自定义的。该设计使用multiprocessing.SimpleQueue 将生成器结果返回给父级,并使用multiprocessing.Barrier 进行同步。

调用Barrier.wait() 将阻塞调用者(任何进程中的线程),直到指定数量的parties 调用了.wait(),因此当前在Barrier 上等待的所有线程同时被释放。此处使用Barrier 可确保仅在父级从迭代中收到所有结果后才开始计算进一步的生成器结果,这可能需要保持整体检查内存消耗。

使用的并行工作人员的数量等于您在gen_args_tuples-iterable 中提供的参数元组的数量,因此gen_args_tuples=zip(range(4)) 将使用四个工作人员。有关详细信息,请参阅代码中的 cmets。

import multiprocessing as mp

SENTINEL = 'SENTINEL'


def gen(a):
    """Your individual generator function."""
    lst = ['a', 'b', 'c']
    for ch in lst:
        for _ in range(int(10e6)):  # some dummy computation
            pass
        yield ch + str(a)


def _worker(i, barrier, queue, gen_func, gen_args):
    for x in gen_func(*gen_args):
        print(f"WORKER-{i} sending item.")
        queue.put((i, x))
        barrier.wait()
    queue.put(SENTINEL)


def parallel_gen(gen_func, gen_args_tuples):
    """Construct and yield from parallel generators
     build from `gen_func(gen_args)`.
     """
    gen_args_tuples = list(gen_args_tuples)  # ensure list
    n_gens = len(gen_args_tuples)
    sentinels = [SENTINEL] * n_gens
    queue = mp.SimpleQueue()
    barrier = mp.Barrier(n_gens + 1)  # `parties`: + 1 for parent

    processes = [
        mp.Process(target=_worker, args=(i, barrier, queue, gen_func, args))
        for i, args in enumerate(gen_args_tuples)
    ]

    for p in processes:
        p.start()

    while True:
        results = [queue.get() for _ in range(n_gens)]
        if results != sentinels:
            results.sort()
            yield tuple(r[1] for r in results)  # sort and drop ids
            barrier.wait()  # all workers are waiting
            # already, so this will unblock immediately
        else:
            break

    for p in processes:
        p.join()


if __name__ == '__main__':

    for res in parallel_gen(gen_func=gen, gen_args_tuples=zip(range(4))):
        print(res)

输出:

WORKER-1 sending item.
WORKER-0 sending item.
WORKER-3 sending item.
WORKER-2 sending item.
('a0', 'a1', 'a2', 'a3')
WORKER-1 sending item.
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-0 sending item.
('b0', 'b1', 'b2', 'b3')
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-1 sending item.
WORKER-0 sending item.
('c0', 'c1', 'c2', 'c3')

Process finished with exit code 0

【讨论】:

  • 非常感谢。这是诀窍!我认为这对于您不想在每次迭代中重新开始的计算密集型流程非常有用。一个帮助 python 以更简单的方式使用多处理的库将非常有用。
  • @creyesk 不客气。是的,IIRC 你不是第一个要求这样的人。
【解决方案2】:

我采用了一些不同的方法,您可以相应地修改下面的示例。 所以在主脚本的某个地方根据你的需要初始化池,你只需要这 2 行

from multiprocessing import Pool

pool = Pool(processes=4)

那么你可以像这样定义一个生成器函数: (请注意,生成器输入被假定为包含所有生成器的任何可迭代)

def parallel_generators(generators, pool):
results = ['placeholder']
while len(results) != 0:
    batch = pool.map_async(next, generators)  # defines the next round of values
    results = list(batch.get)  # actual calculation done here
    yield results
return 

我们在 while 循环中这样定义结果条件,因为当生成器停止生成值时,带有 next 和生成器的映射对象返回一个空列表。所以此时我们只需终止并行生成器。

编辑

显然 multiproccecing 池和地图不能很好地与生成器配合使用,使得上述代码无法按预期工作,因此 在以后更新之前不要使用

至于 pickle 错误,似乎某些绑定函数不支持多处理库中需要的用于传输对象和函数的 pickle,作为解决方法,pathos mutliprocessing 库使用 dill 解决了对 pickle 的需求,并且是您可能想尝试的选项,在 Stack Overflow 中搜索您的错误,您还可以找到一些更复杂的解决方案,使用自定义代码来腌制所需的功能。

【讨论】:

  • 这是一个不错的方法。但是,如果我尝试产生 batch.get() 那么它实际上会异步运行地图并且我得到相同的 TypeError: cannot pickle 'generator' object. 我错过了什么吗?
猜你喜欢
  • 2011-12-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-12-24
  • 1970-01-01
  • 2018-12-11
  • 2014-01-02
相关资源
最近更新 更多