【发布时间】:2021-07-20 03:30:02
【问题描述】:
我正在深入研究 Dask,并且(大部分)对它感到满意。但是我无法理解以下场景中发生了什么。 TBH,我敢肯定过去有人问过这样的问题,但是在搜索了一段时间后,我似乎找不到真正击中钉子的问题。所以我们来了!
在下面的代码中,你可以看到一个简单的 python 函数,上面有一个 Dask 延迟装饰器。在我的真实用例场景中,这将是一个“黑匣子”类型的函数,我不在乎会发生什么,只要它保持 4 GB 的内存预算并最终返回一个 pandas 数据帧。在这种情况下,我特别选择了值N=1.5e8,因为这会导致总内存占用接近 2.2 GB(很大,但仍然在预算范围内)。最后,当将此文件作为脚本执行时,我有一个“数据管道”,它只是为一些 ID 运行黑盒函数,最后构建一个结果数据框(然后我可以用它做更多的事情)
执行此操作时会出现令人困惑的位。我可以看到一次只执行了两个函数调用(这是我所期望的),但我收到了警告消息distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 3.16 GiB -- Worker memory limit: 3.73 GiB,此后不久脚本过早退出。这个内存使用量来自哪里?请注意,如果我增加 memory_limit="8GB"(这实际上比我的计算机更多),那么脚本运行良好,并且我的打印语句通知我数据帧确实只使用了 2.2 GB 的内存
请帮助我理解这种行为,并希望实施一种更安全的方法
非常感谢!
顺便说一句:
- 如果有帮助,我使用的是 python 3.8.8、dask 2021.4.0 和分布式 2021.4.0
- 我还在 Linux (Ubuntu) 机器和 Mac M1 上确认了这种行为。它们都表现出相同的行为,尽管 Mac M1 因相同的原因而失败,并且内存使用量要少得多(
N=3e7,或大约 500 MB)
import time
import pandas as pd
import numpy as np
from dask.distributed import LocalCluster, Client
import dask
@dask.delayed
def do_pandas_thing(id):
print(f"STARTING: {id}")
N = 1.5e8
df = pd.DataFrame({"a": np.arange(N), "b": np.arange(N)})
print(
f"df memory usage {df.memory_usage().sum()/(2**30):.3f} GB",
)
# Simulate a "long" computation
time.sleep(5)
return df.iloc[[-1]] # return the last row
if __name__ == "__main__":
cluster = LocalCluster(
n_workers=2,
memory_limit="4GB",
threads_per_worker=1,
processes=True,
)
client = Client(cluster)
# Evaluate "black box" functions with pandas inside
results = []
for i in range(10):
results.append(do_pandas_thing(i))
# compute
r = dask.compute(results)[0]
print(pd.concat(r, ignore_index=True))
【问题讨论】:
标签: pandas dask dask-distributed