【问题标题】:limit number of CPUs used by dask compute限制 dask 计算使用的 CPU 数量
【发布时间】:2021-11-22 12:43:48
【问题描述】:

以下代码使用 appx 1 秒在 8-CPU 系统上执行。如何手动将dask.compute 使用的 CPU 数量配置为 4 个 CPU,这样即使在 8-CPU 系统上,以下代码也将使用 appx 2 秒来执行?

import dask
from time import sleep

def f(x):
    sleep(1)
    return x**2

objs = [dask.delayed(f)(x) for x in range(8)]
print(dask.compute(*objs))  # (0, 1, 4, 9, 16, 25, 36, 49)

【问题讨论】:

    标签: python dask dask-distributed


    【解决方案1】:

    有几个选项:

    1. 在创建集群时指定工作人员的数量
    from dask.distributed import Client
    
    # without specifying unique thread, the function is executed
    # on all threads
    client = Client(n_workers=4, threads_per_worker=1)
    
    # the rest of your code is not changed
    
    1. 指定应执行任务的工作人员数量(以及哪些)
    
    client = Client(n_workers=8, threads_per_worker=1)
    
    list_workers = list(client.scheduler_info()['workers'])
    
    client.compute(objs, workers=list_workers[:4]) 
    
    # submit only to the first 4 workers
    # note that workers should still be single-threaded, but the difference
    # from option 1 is that you could in principle have more workers
    # that are idle, also the `workers` kwarg can be passed to
    # dask.compute rather than client.compute
    
    1. 指定一个信号量
    from dask.distributed import Client, Semaphore
    
    client = Client()
    sem = Semaphore(max_leases=4, name="foo")
    
    def fmodified(x, sem):
        with sem:
            return f(x)
    
    objs = [dask.delayed(fmodified)(x, sem) for x in range(8)]
    print(dask.compute(*objs))  # (0, 1, 4, 9, 16, 25, 36, 49)
    

    更新:正如 @mdurant 在 cmets 中所指出的,如果您在脚本中运行它,则需要 if __name__ == "main": 来保护相关代码不被工作人员执行。例如,上面列表中的第二个选项在脚本中如下所示:

    #!/usr/bin/env python3
    import dask
    from dask.distributed import Client
    from time import sleep
    
    def f(x):
        sleep(1)
        return x**2
    
    objs = [dask.delayed(f)(x) for x in range(8)]
    
    if __name__ == "main":
        client = Client(n_workers=8, threads_per_worker=1)
    
        list_workers = list(client.scheduler_info()['workers'])
    
        results = client.compute(objs, workers=list_workers[:4])
    
        print(results)
    

    【讨论】:

    • 选项 (1) 仍然使用所有 8 个 CPU(大约 1 秒执行)。认为我需要将dask.compute(*objs) 更改为client.compute(*objs),但导致TypeError: Truth of Delayed objects is not supported。选项 (2) 需要一个 client 对象,该对象是从选项 (1) 创建的,但随后导致 TypeError: compute() got multiple values for argument 'workers'。选项 (3) 有效,但由于附加功能而不是首选。对 (1) 或 (2) 进行任何修改以使其运行?
    • 还经历了间歇性的RuntimeError 与所有三个选项:RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase. This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module: if __name__ == '__main__': freeze_support() The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable.
    • 嗯,对不起……让我检查一下……
    • 我之前没有注意到单线程行为......所以这很有趣。回复:freeze_support 消息,我从未见过这种情况......所以可能特定于您的代码/设置(也许您在 Windows 上运行)
    • 冻结的事情是当您将此代码保存到文件中时,而不是在交互式会话或笔记本中运行。在文件中,您需要一个 if __name__ == "main": 块,否则每个子进程都会导入该文件并尝试启动更多工作人员。
    猜你喜欢
    • 2020-05-08
    • 1970-01-01
    • 2022-08-02
    • 2011-11-20
    • 1970-01-01
    • 1970-01-01
    • 2020-03-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多