【发布时间】:2021-05-20 16:10:02
【问题描述】:
我正在尝试通过查看代码示例和文档来使用 Dask,但无法理解它的工作原理。正如文档中所建议的,我正在尝试使用分布式调度程序(我还计划在 HPC 上部署我的代码)。
我尝试的第一件事是这样的:
from dask.distributed import Client
import dask.bag as db
if __name__ == '__main__':
client = Client(n_workers=2)
print("hello world")
hello world 被打印了三次,我认为这是因为工人。我假设除非调用计算,否则不会启动工作人员。
我可以将我的打印语句移动到一个函数中:
if __name__ == '__main__':
client = Client(n_workers=2)
def print_func():
print("hello world")
但是,我如何确保只有一个工作人员执行此功能? (在 MPI 中,我可以使用 rank == 0 来做到这一点;我没有找到任何类似于 MPI_Comm_rank() 的东西,它可以告诉我 Dask 中的工人编号或 ID)。
我更进一步,开始使用 Dask 中提供的示例:
from dask.distributed import Client
import dask.bag as db
if __name__ == '__main__':
client = Client()
def is_even(n):
return n % 2 == 0
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
c = b.filter(is_even).map(lambda x: x ** 2)
print(c.compute())
但这显示了一个错误:An attempt has been made to start a new process before the current process has finished its bootstrapping phase。我假设dask.bag 会自动拆分计算工作。
对于一篇冗长的帖子,我深表歉意,但我无法理解 Dask(我习惯于 MPI 和 OpenMP 编程)。
【问题讨论】:
标签: python dask dask-distributed