【问题标题】:How to process small dataframes that return large results in parallel如何处理并行返回大结果的小数据帧
【发布时间】:2021-03-11 11:33:39
【问题描述】:

我有一个大约 6000 万行的 Pandas DataFrame。前 60 行对应于第一组,依此类推。这些组中的每一个都需要并行处理,并且每个组都返回一个大于 4GB 的 NumPy 数组。我有足够的 RAM 和内核来并行处理大约 100 个这样的组。最后,我必须将每个组的结果相加才能得到我的最终结果,所以最终结果不会大于每个组的结果。

处理大致是:在给定大小的网格(NumPy 数组)上对一组数据进行分箱,并计算该网格与其自身的张量外积(numpy.multiply.outer)。然后将结果相加得到我的最终数组。

如果我按顺序处理这些组,则处理可能需要几天时间。所以,我需要并行处理它们。

首先,我尝试了multiprocessing。由于组的数量很大,我将数据帧拆分为每组 10,000 个组。一个函数将接收一个块,处理块中的每个组并返回这些结果的总和(我最终结果的部分总和)。为了得到我的最终结果,我只需要总结每个过程的结果。我可以看到进程并行运行,但是在返回结果时出现错误'OverflowError('cannot serialize a bytes objects larger than 4GiB',)'。我尝试了这个解决方案:python multiprocessing - OverflowError('cannot serialize a bytes object larger than 4GiB') 这导致我遇到了答案中提到的第二个错误。但是,我不能将我的函数重新定义为 void,因为这需要我将数据存储在文件中,而且我没有足够的磁盘空间来存储所有数据。此外,这些进程将竞争写入磁盘,这会产生瓶颈;然后读取文件,重构数组并添加结果也需要很长时间。

然后我尝试了 Dask。首先,我将 Dask 数据帧划分为 60 个组,创建一个处理每个组的函数,并使用 delayed 调用它。

from dask import delayed
results = []
for partition in ddf.partitions:
    result = delayed(func)(partition)
    results.append(result)

delayed(sum)(results).compute()

但是,大多数进程大部分时间都在休眠,我看不到太多并行性。显然,Dask 不能很好地处理大型任务图。

为了避免出现大型任务图,我将函数替换为一个可以采用大型数据帧(包含许多组)并在函数内处理该数据帧的每个组的函数(类似于 multiprocessing 方法)。

import numpy as np, pandas as pd
def func(df):
    group_len = 60
    ngroups = int(len(df) / group_len) # len(df) is always a multiple of group_len
    sum_array = np.zeros(output_expected_shape) # Here the shape can be up to 6-dimensional
    for group in range(ngroups):
        # Do the processing...
        sum_array += group_result
    return sum_array

我对数据帧进行了分区,因此每个分区将有 600,000 行(100 个组),并使用delayed 方法调用它。然而,再一次,大多数进程大部分时间都在休眠,我无法观察到真正的并行性。我还注意到使用GUI 时,工作人员正在存储大量数据,即使有足够的 RAM 可用。我尝试让每个分区有 6,000,000 行,但这也不起作用(同时让所有这些核心都未使用似乎不是最佳解决方案)。

然后,我尝试了 Dask map_partitions。问题在于,当您需要每个函数生成单个 NumPy 数组时,它无法正常工作。您可以在这个问题中看到问题所在:How to return one NumPy array per partition in Dask?。但总而言之,它返回一个数组,其中部分结果垂直堆叠。为了获得真正的结果,我必须对数组进行切片,获取与部分结果相对应的每个元素并将它们相加。但是 1)这破坏了使用并行性的意义,以及 2)返回的数组可能太大,因为它包含许多不同的结果并且它可能不适合内存。

显然,如果定义了 chunksize,但块大小是由数据量('16MB'、'3GB')而不是行数(这是我需要的)定义的,map_partitions 方法会很好用因为这些组对应于其中的特定数量)。

我希望能够有效地并行处理这些组。到目前为止,顺序解决方案仍然是我的最佳选择(当 Dask 工作人员在一段时间后开始引发 TCP 超时错误时,唯一最终让我得到结果的解决方案),但它太慢了,并且留下了大量未使用的资源。

【问题讨论】:

    标签: pandas dataframe parallel-processing multiprocessing dask


    【解决方案1】:

    使用delayed(sum)(results).compute() 的问题是您要求将所有results 一次性传递给sum。这对于一小部分结果来说不是问题,但是当您的结果列表超过工作人员的组合内存容量时,您的管道就会中断。

    解决此问题的最简单方法是使用现有集合来实现此问题,例如使用建议 here。 (你提到了行分组的问题,不过这个已经解决了here

    另一种减少内存使用的方法是使用tree summation 之类的东西积极聚合所有分区/块/数组,请参阅docs

    L = zs
    while len(L) > 1:
        new_L = []
        for i in range(0, len(L), 2):
            lazy = add(L[i], L[i + 1])  # add neighbors
            new_L.append(lazy)
        L = new_L                       # swap old list for new
    
    dask.compute(L)
    

    【讨论】:

    • 嗨,苏丹。是的,多亏了我可以使用map_partitions 的分组帮助。集合的问题是我得到了一个非常嵌套的延迟操作列表,比如[[[[[[Delayed(('func-df4a93', 0, 0, 0))]]]]], [[[[[Delayed(('func-df4a93', 1, 0, 0))]]]]], ...]]]]]]。即使,我不知道是什么原因造成的,我尝试递归地展平列表。但是,我仍然得到KeyError: ('func-5887b6', 5, 0, 0),但密钥存在于我的列表中。不知道是不是和延迟操作中的双括号有关(可以在列表中看到)。
    • 再想一想,它不应该是双括号,因为键包含它。我无法找到有关此错误的更多信息。
    • 嗯,看起来确实需要在管道中的某个步骤解包列表。您是否像第一个链接中建议的 mdurant 一样使用 *list_of_delayed
    • 是的,我正在对计算调用中的列表进行解包。此外,在展平列表之后(这可能不是最好的方法,但我想看看发生了什么),我最终得到 [Delayed(('func-a0effc', 0, 0, 0)), Delayed((' func-a0effc', 1, 0, 0)), ... ]`。我会尝试看看是否需要在其他地方进行一些拆包。但是如果我在一个小网格上运行我之前的代码(意味着返回的数组很小)并使用几个组(意味着任务图很小),我会得到正确的结果。
    • 我刚刚注意到,有趣的是,我的输出应该是一个 6 维的 NumPy 数组,而我的延迟操作列表也是 6 维的。我将尝试在一个元素或类似元素的列表中返回每个数组。
    猜你喜欢
    • 2019-11-14
    • 2013-06-03
    • 2013-01-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-01-14
    • 2021-10-10
    • 1970-01-01
    相关资源
    最近更新 更多