【发布时间】:2021-06-08 19:13:54
【问题描述】:
我正在使用 dask 处理来自许多参数变化的数据,我的目标是通过对 dask 数组的操作构建 600 000(案例或列数)的最终 dask 数据帧,该数组由形状小于 2000 的小数组构成. 在这里,我的最终数据帧计算了 6400 个案例
dd_final.compute()
0_95_euclidean 1_95_euclidean 2_95_euclidean 3_95_euclidean ... 96_80_l1 97_80_l1 98_80_l1 99_80_l1
0 0.005670 0.010449 0.010756 0.009914 ... 0.007422 0.002066 0.009693 0.003475
1 0.006255 0.009970 0.007987 0.007785 ... 0.006119 0.002104 0.009638 0.004142
2 0.011956 0.018662 0.016426 0.015260 ... 0.013276 0.003897 0.019816 0.007479
3 0.021639 0.037590 0.036749 0.028090 ... 0.029751 0.009725 0.038956 0.011870
4 0.014482 0.022963 0.025416 0.017909 ... 0.017033 -0.002616 0.026231 0.000978
... ... ... ... ... ... ... ... ... ...
1289 0.597443 1.044522 0.898732 0.940219 ... 0.914094 0.792133 0.744501 0.632575
1290 0.594463 1.041562 0.894501 0.935068 ... 0.913409 0.790555 0.742357 0.628366
1291 0.592523 1.035600 0.891222 0.932510 ... 0.907414 0.786722 0.738844 0.627611
1292 0.606415 1.059642 0.912963 0.951523 ... 0.922719 0.800610 0.751161 0.640515
1293 0.601242 1.049654 0.903112 0.942681 ... 0.915391 0.794133 0.744752 0.636788
[1294 rows x 6400 columns]
第一种方法:我对每个函数都使用池星图来加速 8 核 CPU 的操作,并将结果放入 dask 数组中。
def MP_a_func(func,iterable,proc,chunk):
#
pool=multiprocessing.Pool(processes=proc)
Result=pool.starmap_async(func,iterable,chunksize=chunk)
#
return Result
if __name__ == '__main__':
performances=MP_a_func(Post_processing_Weights,iterable,proc,chunk)
da_arr=da.from_array(performances.get(),chunks=chunk)
#... Some operations
#...
dd_final=dd.from_dask_array(da_arr).repartition(chunk)
此方法失败,因为在存储到 dask 数组之前内存不足以存储 MP 对象。
第二种方法:我想使用池星图,但将我的可迭代切片并在每个切片上附加 dask 数组
for iterable in [iter1,iter2,...,iter10000]:
if __name__ == '__main__':
performances=MP_a_func(Post_processing_Weights,iterable,proc,chunk)
partial_da_arr=da.from_array(performances.get(),chunks=chunk)
# append or assign to da_arr ??
如何在每个步骤中使用 append 或分配给 dask 数组而不加载内存,或者有更好的方法吗?
感谢您的帮助
【问题讨论】:
-
我已经使用 dask
delayedAPI 更新了上面的代码,其中import dask.delayed as delayed if __name__ == '__main__': performances=delayed(MP_a_func(Post_processing_Weights,iterable,proc,chunk) )
标签: python pandas multiprocessing dask