【发布时间】:2020-11-03 09:41:21
【问题描述】:
我正在尝试使用 dask 来并行化一些代码。我并行化的函数有 3 个参数,但随着循环的进行,这些参数中只有一个会发生变化。这是我目前所拥有的:
import dask
import numpy as np
# Set up client
cluster = SLURMCluster(cores=1, memory='40 GB',
queue='brc', interface='em1',
log_directory='./dask_logs')
cluster.scale(jobs=2)
client = distributed.Client(cluster)
# Fuction to be parrallelised
def nT_loop(i, P,inv_DiagCe):
x = P[:,i]* np.squeeze(-inv_DiagCe)
return x
P = np.random.rand(64620, 64620)
inv_DiagCe = np.random.rand(64620)
# Run loop
res1=[]
for i in range(2):
res = dask.delayed(nT_loop)(i, P,inv_DiagCe)
res1.append(res)
# Compute results
res1 = dask.compute(*res1)
当我运行它时,它会给出以下错误:
~/miniconda3/envs/python38/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
48 buffers.clear()
---> 49 result = pickle.dumps(x, **dump_kwargs)
50 if len(result) < 1000:
MemoryError:
During handling of the above exception, another exception occurred:
MemoryError Traceback (most recent call last)
~/wang_model/estimation.py in
208 #P[:,i] = P[:,i]* np.squeeze(-inv_DiagCe) #bsxfun(@times, P(:,i), -inv_DiagCe');
209
---> 210 res1 = dask.compute(*res1)
211 print(datetime.now().strftime("%H:%M:%S"))
~/miniconda3/envs/python38/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
450 postcomputes.append(x.__dask_postcompute__())
451
--> 452 results = schedule(dsk, keys, **kwargs)
453 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
454
~/miniconda3/envs/python38/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2703 Client.compute: Compute asynchronous collections
2704 """
-> 2705 futures = self._graph_to_futures(
2706 dsk,
2707 keys=set(flatten([keys])),
~/miniconda3/envs/python38/lib/python3.8/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
2639 {
2640 "op": "update-graph",
-> 2641 "tasks": valmap(dumps_task, dsk),
2642 "dependencies": dependencies,
2643 "keys": list(map(tokey, keys)),
~/miniconda3/envs/python38/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
~/miniconda3/envs/python38/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
~/miniconda3/envs/python38/lib/python3.8/site-packages/distributed/worker.py in dumps_task(task)
3356 return d
3357 elif not any(map(_maybe_complex, task[1:])):
-> 3358 return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
3359 return to_serialize(task)
3360
~/miniconda3/envs/python38/lib/python3.8/site-packages/distributed/worker.py in warn_dumps(obj, dumps, limit)
3365 def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
3366 """ Dump an object to bytes, warn if those bytes are large """
-> 3367 b = dumps(obj, protocol=4)
3368 if not _warn_dumps_warned[0] and len(b) > limit:
3369 _warn_dumps_warned[0] = True
~/miniconda3/envs/python38/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
58 try:
59 buffers.clear()
---> 60 result = cloudpickle.dumps(x, **dump_kwargs)
61 except Exception as e:
62 logger.info("Failed to serialize %s. Exception: %s", x, e)
~/miniconda3/envs/python38/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
71 file, protocol=protocol, buffer_callback=buffer_callback
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
75
~/miniconda3/envs/python38/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
561 def dump(self, obj):
562 try:
--> 563 return Pickler.dump(self, obj)
564 except RuntimeError as e:
565 if "recursion" in e.args[0]:
MemoryError: '
我认为这可能与'P'的大尺寸有关。有人有什么建议吗?
谢谢
【问题讨论】:
-
你能在第一次使用
import dill as pickle的技巧后重现相同的异常吗?感谢您的友好更新。
标签: python parallel-processing dask