【问题标题】:Dask MemoryError黑暗记忆错误
【发布时间】:2020-11-03 09:41:21
【问题描述】:

我正在尝试使用 dask 来并行化一些代码。我并行化的函数有 3 个参数,但随着循环的进行,这些参数中只有一个会发生变化。这是我目前所拥有的:

import dask
import numpy as np

# Set up client
cluster = SLURMCluster(cores=1, memory='40 GB', 
                queue='brc', interface='em1',
                log_directory='./dask_logs')
cluster.scale(jobs=2)
client = distributed.Client(cluster)

# Fuction to be  parrallelised
def nT_loop(i, P,inv_DiagCe):
    x = P[:,i]* np.squeeze(-inv_DiagCe)
    return x

P = np.random.rand(64620, 64620)
inv_DiagCe = np.random.rand(64620)

# Run loop
res1=[]
for i in range(2):
    res = dask.delayed(nT_loop)(i, P,inv_DiagCe)
    res1.append(res)

# Compute results
res1 = dask.compute(*res1)

当我运行它时,它会给出以下错误:

~/miniconda3/envs/python38/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     48         buffers.clear()
---> 49         result = pickle.dumps(x, **dump_kwargs)
     50         if len(result) < 1000:

MemoryError: 

During handling of the above exception, another exception occurred:

MemoryError                               Traceback (most recent call last)
~/wang_model/estimation.py in 
     208     #P[:,i] = P[:,i]* np.squeeze(-inv_DiagCe) #bsxfun(@times, P(:,i), -inv_DiagCe');
     209 
---> 210 res1 = dask.compute(*res1)
     211 print(datetime.now().strftime("%H:%M:%S"))

~/miniconda3/envs/python38/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    450         postcomputes.append(x.__dask_postcompute__())
    451 
--> 452     results = schedule(dsk, keys, **kwargs)
    453     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    454 

~/miniconda3/envs/python38/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2703         Client.compute: Compute asynchronous collections
   2704         """
-> 2705         futures = self._graph_to_futures(
   2706             dsk,
   2707             keys=set(flatten([keys])),

~/miniconda3/envs/python38/lib/python3.8/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
   2639                 {
   2640                     "op": "update-graph",
-> 2641                     "tasks": valmap(dumps_task, dsk),
   2642                     "dependencies": dependencies,
   2643                     "keys": list(map(tokey, keys)),

~/miniconda3/envs/python38/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

~/miniconda3/envs/python38/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

~/miniconda3/envs/python38/lib/python3.8/site-packages/distributed/worker.py in dumps_task(task)
   3356             return d
   3357         elif not any(map(_maybe_complex, task[1:])):
-> 3358             return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
   3359     return to_serialize(task)
   3360 

~/miniconda3/envs/python38/lib/python3.8/site-packages/distributed/worker.py in warn_dumps(obj, dumps, limit)
   3365 def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
   3366     """ Dump an object to bytes, warn if those bytes are large """
-> 3367     b = dumps(obj, protocol=4)
   3368     if not _warn_dumps_warned[0] and len(b) > limit:
   3369         _warn_dumps_warned[0] = True

~/miniconda3/envs/python38/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     58         try:
     59             buffers.clear()
---> 60             result = cloudpickle.dumps(x, **dump_kwargs)
     61         except Exception as e:
     62             logger.info("Failed to serialize %s. Exception: %s", x, e)

~/miniconda3/envs/python38/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
     71                 file, protocol=protocol, buffer_callback=buffer_callback
     72             )
---> 73             cp.dump(obj)
     74             return file.getvalue()
     75 

~/miniconda3/envs/python38/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
    561     def dump(self, obj):
    562         try:
--> 563             return Pickler.dump(self, obj)
    564         except RuntimeError as e:
    565             if "recursion" in e.args[0]:

MemoryError: '

我认为这可能与'P'的大尺寸有关。有人有什么建议吗?

谢谢

【问题讨论】:

  • 你能在第一次使用 import dill as pickle 的技巧后重现相同的异常吗?感谢您的友好更新。

标签: python parallel-processing dask


【解决方案1】:

这里

P = np.random.rand(64620, 64620)

您在内存中生成一个庞大的数组,然后制作副本发送给工作人员。您的函数还返回一个同样大的数组。 您应该至少使用client.scatter 单独执行此步骤,而不是在图中包含数组。

但实际上,dask 有一个perfectly good interface,旨在能够在不破坏内存的情况下分块处理大型数组。我建议你应该使用它而不是你的延迟函数方法。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-11-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多