【发布时间】:2021-03-08 19:49:15
【问题描述】:
我有一个embarrassingly parallel workload,我正在读取一组 parquet 文件,将它们连接成更大的 parquet 文件,然后将其写回磁盘。我在一个分布式计算机(带有分布式文件系统)中运行它,大约有 300 个工作人员,每个工作人员都有 20GB 的 RAM。每个单独的工作应该只消耗 2-3 GB 的 RAM,但不知何故,工作人员由于内存错误而崩溃(获取:distributed.scheduler.KilledWorker 异常)。我可以在工作人员的输出日志中看到以下内容:
内存使用率很高,但工作人员没有数据要存储到磁盘。也许 其他一些进程正在泄漏内存。进程内存:18.20 GB
with open('ts_files_list.txt', 'r') as f:
all_files = f.readlines()
# There are about 500K files
all_files = [f.strip() for f in all_files]
# grouping them into groups of 50.
# The concatenated df should be about 1GB in memory
npart = 10000
file_pieces = np.array_split(all_files, npart)
def read_and_combine(filenames, group_name):
dfs = [pd.read_parquet(f) for f in filenames]
grouped_df = pd.concat(dfs)
grouped_df.to_parquet(f, engine='pyarrow')
group_names = [f'group{i} for i in range(npart)]
delayed_func = dask.delayed(read_and_combine)
# the following line shouldn't have resulted in a memory error, but it does
dask.compute(map(delayed_func, file_pieces, group_names))
我在这里遗漏了一些明显的东西吗? 谢谢!
Dask 版本:2021.01.0,pyarrow 版本:2.0.0,分布式版本:2021.01.0
【问题讨论】:
-
文件是否在本地文件系统中?请注明您的 dask、distributed 和 pyarrow 版本。
-
@mdurant 这些文件位于超级计算机的分布式文件系统上。 Dask 版本:2021.01.0,pyarrow 版本:2.0.0,分布式版本:2021.01.0
-
我会先尝试降级 pyarrow 或尝试使用 fastparquet,然后再做其他事情。
标签: memory-leaks dask distributed-computing dask-distributed dask-delayed