【问题标题】:python or dask parallel generator?python或dask并行生成器?
【发布时间】:2019-04-19 04:07:32
【问题描述】:

是否可以在 python 中(可能使用 dask,也可能使用多处理)在内核上“放置”生成器,然后并行地逐步遍历生成器并处理结果?

它需要特别是生成器(或带有__iter__的对象);生成器生成的所有生成元素的列表将不适合内存。

特别是:

使用 pandas,我可以调用 read_csv(...iterator=True),这给了我一个迭代器 (TextFileReader) - 我可以 for in 它或多次显式调用 next。整个 csv 永远不会被读入内存。不错。

每次我从迭代器中读取下一个块时,我也会对其执行一些昂贵的计算。

但现在我有 2 个这样的文件。我想创建 2 个这样的生成器,并在一个内核上“放置”1 个,在另一个内核上“放置”1 个,这样我就可以:

 result = expensive_process(next(iterator))

在每个核心上,并行,然后组合并返回结果。重复此步骤,直到一个或两个生成器都超出产量。

看起来 TextFileReader 不是可腌制的,也不是生成器。我不知道如何在 dask 或 multiprocessing 中执行此操作。有这种模式吗?

【问题讨论】:

  • 您能否更具体一点 - 您是否有两个 CSV 文件(可能格式相同 - 或者至少是兼容的列等) - 您想对其应用一些聚合函数?例如expensive_process 的性质是什么?
  • 好吧,说实话,我没有 2 个(是的,格式相同的 csvs)——我实际上有 300 多个。而“expensive_process”的意思就是——在这种情况下,将 csv 列之一从字符串转换为数字数组,然后渲染这些数字。因此,一些昂贵的函数应用于 csv 的每一行。我试图避免连接许多切片,然后将结果发送到并行处理 - 这将是大量的数据传输......

标签: python pandas python-multiprocessing dask


【解决方案1】:

Dask 的 read_csv 旨在以块的形式从多个文件中加载数据,您可以指定块大小。当您对生成的数据帧进行操作时,您将按块工作,这正是首先使用 Dask 的重点。应该不需要使用您的迭代器方法。

您最有可能使用的 dask 数据框方法是 map_partitions()

如果你真的想使用迭代器的想法,你应该研究dask.delayed,它能够并行化任意python函数,通过发送函数的每次调用(使用不同的文件-每个人的名字)给你的工人。

【讨论】:

  • 嗯,我很困惑。现在,如果我有两个文件并调用 dask.dataframe.read_csv([f1, f2], blocksize=100000) 我会得到 2219 个分区 - 第一个 ~1000 个来自 f1 ,其余来自 f2 。我想要做的是并行遍历 f1 和 f2:f1 和 f2 的前 500 行,f1&f2 的第二个 500 行等...如果我调用 map_partitions,它不会立即遍历所有分区?这是因为我无法同时将所有映射结果放入内存中 - 我必须一次使用一批...
【解决方案2】:

幸运的是,我认为这个问题很好地映射到了 python 的多处理 .Process 和 .Queue。

def data_generator(whatever):
   for v in something(whatever):
      yield v

def generator_constructor(whatever):
   def generator(outputQueue):
      for d in data_generator(whatever):
         outputQueue.put(d)
      outputQueue.put(None) # sentinel
   return generator

def procSumGenerator():
   outputQs = [Queue(size) for _ in range(NumCores)]
   procs = [Process(target=generator_constructor(whatever),
                    args=(outputQs[i],))
            for i in range(NumCores)] 

   for proc in procs: proc.start()

   # until any output queue returns a None, collect 
   # from all and yield
   done = False
   while not done:
      results = [oq.get() for oq in outputQs]
      done = any(res is None for res in results)
      if not done:
         yield some_combination_of(results)

   for proc in procs: terminate()

for v in procSumGenerator():
   print(v)

也许使用 Dask 可以做得更好?我发现我的解决方案相当快地使大量生成数据的网络饱和 - 我正在使用 pandas 操作 csvs 并返回大型 numpy 数组。

https://github.com/colinator/doodle_generator/blob/master/data_generator_uniform_final.ipynb

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2011-12-19
    • 2018-03-19
    • 2017-09-22
    • 2018-11-24
    • 2010-12-28
    • 2021-09-01
    • 2020-09-26
    • 2012-05-30
    相关资源
    最近更新 更多