【发布时间】:2021-03-11 11:33:39
【问题描述】:
我有一个大约 6000 万行的 Pandas DataFrame。前 60 行对应于第一组,依此类推。这些组中的每一个都需要并行处理,并且每个组都返回一个大于 4GB 的 NumPy 数组。我有足够的 RAM 和内核来并行处理大约 100 个这样的组。最后,我必须将每个组的结果相加才能得到我的最终结果,所以最终结果不会大于每个组的结果。
处理大致是:在给定大小的网格(NumPy 数组)上对一组数据进行分箱,并计算该网格与其自身的张量外积(numpy.multiply.outer)。然后将结果相加得到我的最终数组。
如果我按顺序处理这些组,则处理可能需要几天时间。所以,我需要并行处理它们。
首先,我尝试了multiprocessing。由于组的数量很大,我将数据帧拆分为每组 10,000 个组。一个函数将接收一个块,处理块中的每个组并返回这些结果的总和(我最终结果的部分总和)。为了得到我的最终结果,我只需要总结每个过程的结果。我可以看到进程并行运行,但是在返回结果时出现错误'OverflowError('cannot serialize a bytes objects larger than 4GiB',)'。我尝试了这个解决方案:python multiprocessing - OverflowError('cannot serialize a bytes object larger than 4GiB') 这导致我遇到了答案中提到的第二个错误。但是,我不能将我的函数重新定义为 void,因为这需要我将数据存储在文件中,而且我没有足够的磁盘空间来存储所有数据。此外,这些进程将竞争写入磁盘,这会产生瓶颈;然后读取文件,重构数组并添加结果也需要很长时间。
然后我尝试了 Dask。首先,我将 Dask 数据帧划分为 60 个组,创建一个处理每个组的函数,并使用 delayed 调用它。
from dask import delayed
results = []
for partition in ddf.partitions:
result = delayed(func)(partition)
results.append(result)
delayed(sum)(results).compute()
但是,大多数进程大部分时间都在休眠,我看不到太多并行性。显然,Dask 不能很好地处理大型任务图。
为了避免出现大型任务图,我将函数替换为一个可以采用大型数据帧(包含许多组)并在函数内处理该数据帧的每个组的函数(类似于 multiprocessing 方法)。
import numpy as np, pandas as pd
def func(df):
group_len = 60
ngroups = int(len(df) / group_len) # len(df) is always a multiple of group_len
sum_array = np.zeros(output_expected_shape) # Here the shape can be up to 6-dimensional
for group in range(ngroups):
# Do the processing...
sum_array += group_result
return sum_array
我对数据帧进行了分区,因此每个分区将有 600,000 行(100 个组),并使用delayed 方法调用它。然而,再一次,大多数进程大部分时间都在休眠,我无法观察到真正的并行性。我还注意到使用GUI 时,工作人员正在存储大量数据,即使有足够的 RAM 可用。我尝试让每个分区有 6,000,000 行,但这也不起作用(同时让所有这些核心都未使用似乎不是最佳解决方案)。
然后,我尝试了 Dask map_partitions。问题在于,当您需要每个函数生成单个 NumPy 数组时,它无法正常工作。您可以在这个问题中看到问题所在:How to return one NumPy array per partition in Dask?。但总而言之,它返回一个数组,其中部分结果垂直堆叠。为了获得真正的结果,我必须对数组进行切片,获取与部分结果相对应的每个元素并将它们相加。但是 1)这破坏了使用并行性的意义,以及 2)返回的数组可能太大,因为它包含许多不同的结果并且它可能不适合内存。
显然,如果定义了 chunksize,但块大小是由数据量('16MB'、'3GB')而不是行数(这是我需要的)定义的,map_partitions 方法会很好用因为这些组对应于其中的特定数量)。
我希望能够有效地并行处理这些组。到目前为止,顺序解决方案仍然是我的最佳选择(当 Dask 工作人员在一段时间后开始引发 TCP 超时错误时,唯一最终让我得到结果的解决方案),但它太慢了,并且留下了大量未使用的资源。
【问题讨论】:
标签: pandas dataframe parallel-processing multiprocessing dask