【发布时间】:2019-04-19 04:07:32
【问题描述】:
是否可以在 python 中(可能使用 dask,也可能使用多处理)在内核上“放置”生成器,然后并行地逐步遍历生成器并处理结果?
它需要特别是生成器(或带有__iter__的对象);生成器生成的所有生成元素的列表将不适合内存。
特别是:
使用 pandas,我可以调用 read_csv(...iterator=True),这给了我一个迭代器 (TextFileReader) - 我可以 for in 它或多次显式调用 next。整个 csv 永远不会被读入内存。不错。
每次我从迭代器中读取下一个块时,我也会对其执行一些昂贵的计算。
但现在我有 2 个这样的文件。我想创建 2 个这样的生成器,并在一个内核上“放置”1 个,在另一个内核上“放置”1 个,这样我就可以:
result = expensive_process(next(iterator))
在每个核心上,并行,然后组合并返回结果。重复此步骤,直到一个或两个生成器都超出产量。
看起来 TextFileReader 不是可腌制的,也不是生成器。我不知道如何在 dask 或 multiprocessing 中执行此操作。有这种模式吗?
【问题讨论】:
-
您能否更具体一点 - 您是否有两个 CSV 文件(可能格式相同 - 或者至少是兼容的列等) - 您想对其应用一些聚合函数?例如
expensive_process的性质是什么? -
好吧,说实话,我没有 2 个(是的,格式相同的 csvs)——我实际上有 300 多个。而“expensive_process”的意思就是——在这种情况下,将 csv 列之一从字符串转换为数字数组,然后渲染这些数字。因此,一些昂贵的函数应用于 csv 的每一行。我试图避免连接许多切片,然后将结果发送到并行处理 - 这将是大量的数据传输......
标签: python pandas python-multiprocessing dask