【问题标题】:Trigger Dask workers to release memory触发 Dask worker 释放内存
【发布时间】:2019-04-30 15:03:02
【问题描述】:

我正在使用 Dask 分发一些函数的计算。我的总体布局如下所示:


    from dask.distributed import Client, LocalCluster, as_completed

    cluster = LocalCluster(processes=config.use_dask_local_processes,
                           n_workers=1,
                           threads_per_worker=1,
                           )
    client = Client(cluster)
    cluster.scale(config.dask_local_worker_instances)

    work_futures = []

    # For each group do work
    for group in groups:
        fcast_futures.append(client.submit(_work, group))

    # Wait till the work is done
    for done_work in as_completed(fcast_futures, with_results=False):
        try:
            result = done_work.result()
        except Exception as error:
            log.exception(error)

我的问题是,对于大量作业,我往往会达到内存限制。我看到了很多:

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 1.15 GB -- Worker memory limit: 1.43 GB

似乎每个未来都没有释放它的记忆。我怎样才能触发它?我在 Python 2.7 上使用 dask==1.2.0。

【问题讨论】:

    标签: dask dask-distributed


    【解决方案1】:

    只要客户端有指向它的未来,调度程序就会对结果提供帮助。当(或不久之后)最后一个未来被 python 垃圾收集时,内存被释放。在您的情况下,您在整个计算过程中将所有期货保存在一个列表中。您可以尝试修改循环:

    for done_work in as_completed(fcast_futures, with_results=False):
        try:
            result = done_work.result()
        except Exception as error:
            log.exception(error)    
        done_work.release()
    

    或将 as_completed 循环替换为在处理完期货后从列表中显式删除的内容。

    【讨论】:

    • 我正在尝试查找已记录的 release() API。我没有在 concurrent.futures 或 Dask 分布式下看到它。我是否以某种方式想念它?
    • 如果从任务中返回 None ,在保留期货时是否应该有任何内存使用(只保留状态和一些元数据?)
    • 是的,具体对象仍然保存在一个工人身上,以及该工人的内部字典中引用它的一些条目。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-24
    • 2019-05-23
    • 2018-09-17
    • 1970-01-01
    • 2018-06-13
    • 1970-01-01
    相关资源
    最近更新 更多