有几个选项:
- 在创建集群时指定工作人员的数量
from dask.distributed import Client
# without specifying unique thread, the function is executed
# on all threads
client = Client(n_workers=4, threads_per_worker=1)
# the rest of your code is not changed
- 指定应执行任务的工作人员数量(以及哪些)
client = Client(n_workers=8, threads_per_worker=1)
list_workers = list(client.scheduler_info()['workers'])
client.compute(objs, workers=list_workers[:4])
# submit only to the first 4 workers
# note that workers should still be single-threaded, but the difference
# from option 1 is that you could in principle have more workers
# that are idle, also the `workers` kwarg can be passed to
# dask.compute rather than client.compute
- 指定一个信号量
from dask.distributed import Client, Semaphore
client = Client()
sem = Semaphore(max_leases=4, name="foo")
def fmodified(x, sem):
with sem:
return f(x)
objs = [dask.delayed(fmodified)(x, sem) for x in range(8)]
print(dask.compute(*objs)) # (0, 1, 4, 9, 16, 25, 36, 49)
更新:正如 @mdurant 在 cmets 中所指出的,如果您在脚本中运行它,则需要 if __name__ == "main": 来保护相关代码不被工作人员执行。例如,上面列表中的第二个选项在脚本中如下所示:
#!/usr/bin/env python3
import dask
from dask.distributed import Client
from time import sleep
def f(x):
sleep(1)
return x**2
objs = [dask.delayed(f)(x) for x in range(8)]
if __name__ == "main":
client = Client(n_workers=8, threads_per_worker=1)
list_workers = list(client.scheduler_info()['workers'])
results = client.compute(objs, workers=list_workers[:4])
print(results)