【问题标题】:KilledWorker error in dask when doing embarrassingly parallel data concatenation在进行令人尴尬的并行数据连接时,dask 中的 KilledWorker 错误
【发布时间】:2021-03-08 19:49:15
【问题描述】:

我有一个embarrassingly parallel workload,我正在读取一组 parquet 文件,将它们连接成更大的 parquet 文件,然后将其写回磁盘。我在一个分布式计算机(带有分布式文件系统)中运行它,大约有 300 个工作人员,每个工作人员都有 20GB 的 RAM。每个单独的工作应该只消耗 2-3 GB 的 RAM,但不知何故,工作人员由于内存错误而崩溃(获取:distributed.scheduler.KilledWorker 异常)。我可以在工作人员的输出日志中看到以下内容:

内存使用率很高,但工作人员没有数据要存储到磁盘。也许 其他一些进程正在泄漏内存。进程内存:18.20 GB

with open('ts_files_list.txt', 'r') as f:
    all_files = f.readlines()

# There are about 500K files
all_files = [f.strip() for f in all_files]

# grouping them into groups of 50. 
# The concatenated df should be about 1GB in memory
npart = 10000
file_pieces = np.array_split(all_files, npart)

def read_and_combine(filenames, group_name):
    dfs = [pd.read_parquet(f) for f in filenames]
    grouped_df = pd.concat(dfs)
    grouped_df.to_parquet(f, engine='pyarrow')

group_names = [f'group{i} for i in range(npart)]
delayed_func = dask.delayed(read_and_combine)

# the following line shouldn't have resulted in a memory error, but it does
dask.compute(map(delayed_func, file_pieces, group_names)) 

我在这里遗漏了一些明显的东西吗? 谢谢!

Dask 版本:2021.01.0,pyarrow 版本:2.0.0,分布式版本:2021.01.0

【问题讨论】:

  • 文件是否在本地文件系统中?请注明您的 dask、distributed 和 pyarrow 版本。
  • @mdurant 这些文件位于超级计算机的分布式文件系统上。 Dask 版本:2021.01.0,pyarrow 版本:2.0.0,分布式版本:2021.01.0
  • 我会先尝试降级 pyarrow 或尝试使用 fastparquet,然后再做其他事情。

标签: memory-leaks dask distributed-computing dask-distributed dask-delayed


【解决方案1】:

有几个语法错误,但总体而言,工作流程似乎是合理的。

with open('ts_files_list.txt', 'r') as f:
    all_files = f.readlines()

all_files = [f.strip() for f in all_files]

npart = 10000
file_pieces = np.array_split(all_files, npart)

def read_and_combine(filenames, group_name):
    grouped_df = pd.concat(pd.read_parquet(f) for f in filenames)
    grouped_df.to_parquet(group_name, engine='pyarrow')
    del grouped_df # this is optional (in principle dask should clean up these objects)

group_names = [f'group{i}' for i in range(npart)]
delayed_func = dask.delayed(read_and_combine)

dask.compute(map(delayed_func, file_pieces, group_names))

要记住的另一件事是parquet 文件在默认情况下是压缩的,因此在解压缩时,它们占用的内存可能比压缩文件的大小要多得多。不确定这是否适用于您的工作流程,但在遇到内存问题时要记住一些事情。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-03-29
    • 2022-01-13
    • 1970-01-01
    • 1970-01-01
    • 2020-10-27
    • 1970-01-01
    • 2017-02-22
    • 1970-01-01
    相关资源
    最近更新 更多