【发布时间】:2021-12-03 10:51:46
【问题描述】:
我正在组合生成器以进行一些数据处理。我首先批处理数据生成器,以便在 API 调用中进行线程化,例如:
from itertools import groupby, count
def batch(data: List[Any], size=4):
c = count()
for _, g in groupby(data, lambda _: next(c)//size):
yield g
然后我将其提供给线程以进行 API 调用
from concurrent.futures import ThreadPoolExecutor
def thread(data: Iterable, func: Callable, n=4):
with ThreadPoolExecutor(max_workers=n) as executor:
for batch in data:
yield executor.map(func, batch)
现在我正在尝试将批次合并回列表/生成器中,以便在生成器管道的下游使用。我试过这个
from itertools import chain
def flat_map(batches: Iterable):
for i in list(chain(batches)):
yield i
但i 似乎仍然是生成器,而不是列表中的项目?
【问题讨论】:
-
如果可以复制&粘贴&测试,我可以测试我认为可以解决的问题...
-
chain(*batches)?
标签: python functional-programming