【问题标题】:Dask crashing when saving to file?保存到文件时Dask崩溃?
【发布时间】:2020-12-30 01:07:39
【问题描述】:

我正在尝试对数据集进行 onehot 编码,然后按特定列进行分组,这样我就可以为该列中的每个项目获取一行,并汇总查看哪些 onehot 列对于该特定行是正确的。它似乎适用于小数据,而使用 dask 似乎适用于大型数据集,但是当我尝试保存文件时遇到问题。我试过 CSV 和镶木地板文件。我想保存结果,然后我可以稍后分块打开它。

这里是显示问题的代码(下面的脚本生成 2M 行和多达 30k 个唯一值到 onehot 编码)。

import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, wait

sizeOfRows = 2000000
columnsForDF = 30000 
partitionsforDask = 500 
print("partition is ", partitionsforDask)


cluster = LocalCluster()
client = Client(cluster)
print(client)



df = pd.DataFrame(np.random.randint(0,columnsForDF,size=(sizeOfRows, 2)), columns=list('AB'))
ddf = dd.from_pandas(df, npartitions=partitionsforDask)
# ddf = ddf.persist()
wait(ddf)

# %%time
# need to globally know the categories before one hot encoding
ddf = ddf.categorize(columns=["B"])
one_hot = dd.get_dummies(ddf, columns=['B'])
print("starting groupby")
# result = one_hot.groupby('A').max().persist() # or to_parquet/to_csv/compute/etc.
# result = one_hot.groupby('A', sort=False).max().to_csv('./daskDF.csv', single_file = True)
result = one_hot.groupby('A', sort=False).max().to_parquet('./parquetFile')
wait(result)

它似乎可以工作,直到它对 csv 或 parquet 进行 groupby。那时,我收到很多关于工人超过 95% 内存的错误,然后程序以“killedworker”异常退出:

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
KilledWorker: ("('dataframe-groupby-max-combine-3ddcd8fc854613101b4bdc7fccde32cd', 1, 0, 0)", <Worker 'tcp://127.0.0.1:33815', name: 6, memory: 0, processing: 22>)

监控我的机器,我从来没有接近超过内存,我的驱动器空间超过 300 GB,从未使用过(在此过程中没有创建文件,尽管它在 groupby 部分中)。

我能做什么?

更新 - 我想我会添加一个奖项。我对 .to_csv 也有同样的问题,因为其他人也有类似的问题,我希望它对广大受众有价值。

【问题讨论】:

  • fwiw 我可以在 jupyter.org/tryjupyter.org/try的 jupyter notebook 中重现这个
  • 嘿,你解决了这个问题吗?
  • @HanaKaze nope,无法解决此问题并找到了不同的解决方案。使用嵌入而不是一种热编码。

标签: python pandas dask dask-distributed


【解决方案1】:

让我们首先考虑最终结果:它将是一个包含 30'000 列和 30'000 行的数据框。该对象将占用大约 6.7 GB 的内存。 (可以使用 dtypes 来减少内存占用,而且并非所有组合都可能出现在数据中,但为了简单起见,我们忽略这些点)

现在,假设我们只有两个分区,每个分区都包含所有可能的虚拟变量组合。这意味着每个工作人员至少需要 6.7 GB 来存储 .groupby().max() 对象,但最后一步需要 13.4 GB,因为最终工作人员需要找到这两个对象的 .max。自然,如果您有更多的分区,最终工作人员的内存需求将会增加。通过在相关函数中指定split_every,有一种方法可以在dask 中控制它。例如,如果指定.max(split_every=2),那么任何worker最多会收到2个对象(split_every的默认值为8)。

在处理 500 个分区的早期,每个分区可能只包含可能的虚拟值的一个子集,因此内存要求很低。但是,随着dask 在计算最终结果的过程中不断前进,它会将具有不同虚拟值组合的对象组合在一起,因此内存需求将随着流水线的末端而增长。

原则上,您还可以使用resources 来限制一个工作人员一次将执行多少个任务,但如果工作人员没有足够的内存来处理这些任务,这将无济于事。

解决这个问题的潜在方法是什么?至少有几个选项:

  • 使用拥有更多资源的工人;

  • 简化任务(例如,根据可能类别的子集将任务拆分为多个子任务);

  • 使用delayed/futures 开发自定义工作流,对数据进行排序并实施自定义优先级,确保工作人员在进行最终聚合之前完成部分工作。

如果工人内存是一个约束,那么子集必须非常细粒度。例如,在极限情况下,仅对一个可能的虚拟变量组合的子集将具有非常低的内存需求(初始数据加载和过滤器仍需要足够的内存来适应分区),但当然这是一个极端的例子,会产生数十数千个任务,因此建议使用更大的类别组(平衡任务数量和内存需求)。要查看示例,您可以查看此相关answer

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多