【问题标题】:dask compute dict of delayed functions延迟函数的 dask 计算字典
【发布时间】:2021-12-25 06:42:44
【问题描述】:

我想并行化这段代码:

-        "mean": float(zonal_extract.mean().compute()),
-        "min": float(zonal_extract.min().compute()),
-        "max": float(zonal_extract.max().compute()),
-        "sum": float(zonal_extract.sum().compute()),
-        "stddev": float(zonal_extract.std().compute()),
-        "var": float(zonal_extract.var().compute()),

这是我第一次尝试在 python 中并行化某些东西,它不是被一遍又一遍地调用的同一个函数。这将是相同的数据,不同的功能。

尝试1

from dask import compute, delayed


results = delayed({})
results["mean"] = zonal_extract.mean
results["min"] = zonal_extract.min
results["max"] = zonal_extract.max
results["sum"] = zonal_extract.sum
results["stddev"] = zonal_extract.std
results["var"] = zonal_extract.var
results = compute(results, num_workers=4)  # , scheduler='processes'
results = {k: float(v) for k, v in results.items()}

尝试2

mean, min, max, sum, stddev, var = compute(
    zonal_extract.mean(),
    zonal_extract.min(),
    zonal_extract.max(),
    zonal_extract.sum(),
    zonal_extract.std(),
    zonal_extract.var(),
    num_workers=4,
)  # , scheduler='processes'
results = {k: float(v) for k, v in dict(mean, min, max, sum, stddev, var).items()}

这似乎是一项简单的任务,但我找不到任何可行的方法。也许是因为我已经在多处理上下文和嵌套线程中(这可能是不存在但听起来很酷的东西)或者是错误:

    L = Parallel(n_jobs=-1)(
  File "/usr/local/lib/python3.9/dist-packages/joblib/parallel.py", line 1056, in __call__
    self.retrieve()
  File "/usr/local/lib/python3.9/dist-packages/joblib/parallel.py", line 935, in retrieve
    self._output.extend(job.get(timeout=self.timeout))
  File "/usr/local/lib/python3.9/dist-packages/joblib/_parallel_backends.py", line 542, in wrap_future_result
    return future.result(timeout=timeout)
  File "/usr/lib/python3.9/concurrent/futures/_base.py", line 445, in result
    return self.__get_result()
  File "/usr/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
TypeError: Delayed objects of unspecified length are not iterable

real    0m25.048s
user    0m46.943s

编辑:

哦,那是因为延迟函数覆盖了joblib的

from dask import compute, delayed
from joblib import Parallel, delayed

【问题讨论】:

标签: python dask python-xarray


【解决方案1】:

主要问题是我导入了两个具有相同函数名的东西。改变这个

from dask import compute, delayed
from joblib import Parallel, delayed

到这里

import dask
from joblib import Parallel, delayed

然后第二次尝试代码开始工作

    mean, min, max, sum, stddev, var = dask.compute(
        zonal_extract.mean(),
        zonal_extract.min(),
        zonal_extract.max(),
        zonal_extract.sum(),
        zonal_extract.std(),
        zonal_extract.var(),
        num_workers=3,
    )

    results = {
        k: float(v)
        for k, v in dict(
            mean=mean, min=min, max=max, sum=sum, stddev=stddev, var=var
        ).items()
    }

但如果有人有办法在 dask 中实际使用 dicts 而不是三次命名,我会很乐意接受这个答案

【讨论】:

    【解决方案2】:

    dask.compute 会为你递归成字典。

    你可以这样写:

    results = dict(
        mean=dask.delayed(zonal_extract.mean)(),
        min=dask.delayed(zonal_extract.min)()
        # and more
    )
    
    results = dask.compute(results)[0]
    

    基本思想是您可以将延迟计算嵌套到您传递给dask.compute 的元组、列表、字典等中。这里需要的只是从函数调用中生成成熟的延迟对象。

    在不重复自己方面,我们可以更“高效”:

    
    computations = {k: dask.delayed(getattr(zonal_extract, k))()
                    for k in "mean min max sum std var".split()}
    results = dask.compute(computations)[0]
    
    

    如果我退后一步,我想这种并行化的级别似乎太低了——这些都是算术运算中不那么密集的聚合,它们都遍历相同的数据来完成它。 var 只是 std 的平方,从这个意义上说,加速更简单。

    【讨论】:

    • 我相信 .min() 和其他类方法可能已经是 rioxarray 中的延迟函数,即使它们看起来与 pandas 相同,但它们需要调用 .compute() 才能对其进行评估。但我同意你所说的一切。似乎单线程光栅的性能要好得多,我应该跨数据并行化(分割区域)而不是函数。在我的区域统计案例中,与单线程 rasterio 相比,Dask/rioxarray 占用的 CPU 时间多 38 倍,实时性多 2 倍(但我认为如果我的 RAM 用完,rioxarray 可以处理更大的文件)
    【解决方案3】:

    @creanion 的回答很好,但我还要指出,不需要将 mean()var()stddev() 等操作包装在 dask.delayed 对象中:这些已经是惰性操作了,直接调用dask.compute()就可以了。

    因此,没有 delayed 包装器的最小示例是:

    import dask
    import dask.array as da
    
    # Generate some fake data
    zonal_extract = da.random.uniform(size=(100,), chunks=10)
    
    summary_stats = {
        "mean": zonal_extract.mean(),
        "std": zonal_extract.std(),
        "var": zonal_extract.var(),
        "min": zonal_extract.min(),
        "max": zonal_extract.max(),
    }
    
    # traverse=True is default, but being explicit
    summary_stats_computed, = dask.compute(summary_stats, traverse=True)
    

    产生(用我的随机数滚动):

    {'mean': 0.4903848677019127,
     'std': 0.30733105780457826,
     'var': 0.09445237909128101,
     'min': 0.000996718178509548,
     'max': 0.9981326789252434}
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-04-04
      • 2022-12-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-07-14
      • 1970-01-01
      • 2013-12-15
      相关资源
      最近更新 更多