【发布时间】:2021-01-25 01:03:30
【问题描述】:
假设我有 N 个生成器 gen_1, ..., gen_N,其中每个生成器都会产生相同数量的值。我想要一个生成器gen,以便它在 N 个并行进程中运行 gen_1、...、gen_N 并产生 (next(gen_1), next(gen_2), ... next(gen_N))
那是我想要的:
def gen():
yield (next(gen_1), next(gen_2), ... next(gen_N))
每个 gen_i 都在自己的进程上运行。是否有可能做到这一点?我尝试在以下虚拟示例中执行此操作,但没有成功:
A = range(4)
def gen(a):
B = ['a', 'b', 'c']
for b in B:
yield b + str(a)
def target(g):
return next(g)
processes = [Process(target=target, args=(gen(a),)) for a in A]
for p in processes:
p.start()
for p in processes:
p.join()
但是我收到错误TypeError: cannot pickle 'generator' object。
编辑:
我已经修改了@darkonaut 的答案以满足我的需要。我发布它以防你们中的一些人觉得它有用。我们首先定义几个效用函数:
from itertools import zip_longest
from typing import List, Generator
def grouper(iterable, n, fillvalue=iter([])):
"Collect data into fixed-length chunks or blocks"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def split_generators_into_batches(generators: List[Generator], n_splits):
chunks = grouper(generators, len(generators) // n_splits + 1)
return [zip_longest(*chunk) for chunk in chunks]
以下类负责将任意数量的生成器拆分为 n(进程数)批次并处理它们以产生所需的结果:
import multiprocessing as mp
class GeneratorParallelProcessor:
SENTINEL = 'S'
def __init__(self, generators, n_processes = 2 * mp.cpu_count()):
self.n_processes = n_processes
self.generators = split_generators_into_batches(list(generators), n_processes)
self.queue = mp.SimpleQueue()
self.barrier = mp.Barrier(n_processes + 1)
self.sentinels = [self.SENTINEL] * n_processes
self.processes = [
mp.Process(target=self._worker, args=(self.barrier, self.queue, gen)) for gen in self.generators
]
def process(self):
for p in self.processes:
p.start()
while True:
results = list(itertools.chain(*(self.queue.get() for _ in self.generators)))
if results != self.sentinels:
yield results
self.barrier.wait()
else:
break
for p in self.processes:
p.join()
def _worker(self, barrier, queue, generator):
for x in generator:
queue.put(x)
barrier.wait()
queue.put(self.SENTINEL)
要使用它,只需执行以下操作:
parallel_processor = GeneratorParallelProcessor(generators)
for grouped_generator in parallel_processor.process():
output_handler(grouped_generator)
【问题讨论】:
-
如果您已经拥有生成器对象,则没有通用的方法可以将它们移植到另一个进程中。您需要在每个
Process开始时使用一个目标函数,该函数将在那里创建生成器。 -
即使您设法做到这一点,GIL 也可能会阻止它们并行运行。
-
@MarkRansom 他使用的是
multiprocessing而不是线程,所以我认为 GIL 不适用于这里。 -
@thegamecracks 抱歉,我错过了;你是对的,它会从等式中删除 GIL。但它确实使数据交换更加棘手。
标签: python parallel-processing multiprocessing generator python-multiprocessing