【问题标题】:What all parameters to be called in a async function in python?在 python 的异步函数中要调用的所有参数是什么?
【发布时间】:2020-09-09 02:38:56
【问题描述】:
  1. 我有一个函数var。我想知道通过利用系统拥有的所有处理器、内核和 RAM 内存的多处理/并行处理来快速运行此函数中的循环的最佳方法。

    import numpy as np
    from pysheds.grid import Grid
    
    xs = 82.1206, 80.8707, 80.8789, 80.8871, 80.88715
    ys = 25.2111, 16.01259, 16.01259, 16.01259, 15.9956
    
    a = r'/home/test/image1.tif'
    b = r'/home/test/image2.tif'
    
    def var(interest):
    
        variable_avg = []
        for (x,y) in zip(xs,ys):
            grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    
            grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label') 
    
            grid.clip_to('catch')
    
            grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    
            variablemask = grid.view('variable', nodata=np.nan)
            variablemask = np.array(variablemask)
            variablemean = np.nanmean(variablemask)
            variable_avg.append(variablemean)
        return(variable_avg)
    
    
  2. 如果我可以同时运行函数var 并针对给定的函数的多个参数在其中并行循环,那就太好了。 例如:同时调用var(a)var(b)。因为它所消耗的时间要少得多,因此只需并行处理多个坐标(xs,ys)的循环。

pysheds文档可以在here找到。

grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')处代码中使用data.tif的数据可以直接here下载。相同的数据可以在目录中以不同的名称复制,并在a = r'/home/test/image1.tif'的地方使用 和b = r'/home/test/image2.tif' 用于测试代码。

为了加快上面的代码,我得到了一个建议here,如下:

def process_poi(interest, x, y):
    grid = Grid.from_raster(interest, data_name='map')

    grid.catchment(data='map', x=x, y=y, out_name='catch')

    variable = grid.view('catch', nodata=np.nan)
    variable = np.array(variable)
    return variable.mean()

async def var_loop_async(interest, pool, loop):
    tasks = []
    for (x,y) in zip(xs,ys):
        function_call = functools.partial(process_poi, interest, x, y)
        tasks.append(loop.run_in_executor(pool, function_call))

    return await asyncio.gather(*tasks)

async def main():
    loop = asyncio.get_event_loop()
    pool_start = time.time()
    tasks = []
    with ProcessPoolExecutor() as pool:
        for _ in range(100):
            tasks.append(var_loop_async(a, pool, loop))
        results = await asyncio.gather(*tasks)
        pool_end = time.time()
        print(f'Process pool took {pool_end-pool_start}')

    serial_start = time.time() 

但是,我不明白如何调用函数var_loop_async(interest, pool, loop)。实际上,我无法获得要调用的参数来代替 poolloop

我是 python 编程的新手。

如果可能的话,请将上述建议作为可重现的解决方案,以便它可以直接在 python 中运行。或者如果您有其他更好的建议来加快原始代码的速度,请告诉我。

【问题讨论】:

    标签: python parallel-processing multiprocessing python-asyncio


    【解决方案1】:

    首先,在您的原始代码中,我看到了:

    for (x,y) in zip(xs,ys):
        grid = Grid.from_raster(interest, data_name='map')
    

    我不熟悉pysheds 模块,也找不到任何关于它的文档,所以我不知道Grid.from_raster 是否是一项昂贵的操作。但似乎这条语句是在for 循环之上移动而不是在循环中重新计算的候选者。也许仅此一项就能获得显着的性能改进。您提到的链接What all parameters to be called in a async function in python? 表明,创建进程池的开销可能不足以弥补麻烦。此外,如果Grid.from_raster 很昂贵并且通过将其从循环中移除来获利,那么多处理解决方案本质上是“将其放回循环中”,通过使其对每个 x、y 对执行,从而减少多处理解决方案可能会导致性能提升。

    无论如何,要使用建议的技术运行您的代码,请参见下文。不幸的是,您不能在处理器池中同时运行process_poivar_loop_async。但请在下面进一步寻找不同的解决方案。

    import numpy
    from pysheds.grid import Grid
    from concurrent.futures.process import ProcessPoolExecutor
    import asyncio
    
    
    xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
    ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)
    
    file_list = (
        r'/home/test/image1.tif',
        r'/home/test/image2.tif'
    )
    
    
    def process_point(interest, x, y):
        grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
        grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
        grid.clip_to('catch')
        grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
        variablemask = grid.view('variable', nodata=np.nan)
        variablemask = numpy.array(variablemask)
        variablemean = np.nanmean(variablemask)
        return variablemean
    
    
    async def var_loop_async(interest, pool, loop):
        tasks = [loop.run_in_executor(pool, process_point, interest, x, y) for (x, y) in zip(xs, ys)]
        return await asyncio.gather(*tasks)
    
    
    async def main():
        loop = asyncio.get_event_loop()
        with ProcessPoolExecutor() as pool:
            tasks = [var_loop_async(file, pool, loop) for file in file_list]
            results = await asyncio.gather(*tasks)
            print(results)
    
    
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        try:
            loop.run_until_complete(main())
        finally:
            loop.close()
    

    不同的解决方案

    您希望能够在进程池中为要处理的每个文件运行var,然后在子进程中处理每个 x、y 对。这意味着您需要每个处理文件的子进程都有自己的进程池来处理 x、y 对。这通常是不可能的,因为为进程池创建的进程是 daemon 进程(它们会在主进程终止时自动终止)并且不允许创建自己的子进程。为了克服这个问题,我们必须创建自己的 mutliprocessor.Pool 特化,并使用自己的池初始化每个子进程。

    但这会提高性能吗?var 子进程基本上什么都不做,除了等待process_poi 子进程完成它们的工作。所以我不希望这对以前的代码有很大的改进。而且,正如我所提到的,尚不清楚这两种多处理解决方案是否会比原始代码有所改进,尤其是经过修改以重新定位 Grid.from_raster 调用的代码。

    import numpy
    from pysheds.grid import Grid
    import functools
    from multiprocessing.pool import Pool
    import multiprocessing
    import os
    
    # This allows subprocesses to have their own pools:
    
    class NoDaemonProcess(multiprocessing.Process):
        # make 'daemon' attribute always return False
        def _get_daemon(self):
            return False
        def _set_daemon(self, value):
            pass
        daemon = property(_get_daemon, _set_daemon)
    
    class NoDaemonContext(type(multiprocessing.get_context())):
        Process = NoDaemonProcess
    
    class MyPool(multiprocessing.pool.Pool):
        def __init__(self, *args, **kwargs):
            kwargs['context'] = NoDaemonContext()
            super(MyPool, self).__init__(*args, **kwargs)
    
    
    xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
    ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306
    
    a = r'/home/test/image1.tif'
    b = r'/home/test/image2.tif'
    
    
    pool2 = None
    
    def init_pool():
        global pool2
        #pool2 = Pool(5)
        pool2 = Pool(os.cpu_count // 2) # half the available number of processors
    
    
    def process_poi(interest, x, y):
        grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
        grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
        grid.clip_to('catch')
        grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
        variablemask = grid.view('variable', nodata=np.nan)
        variablemask = numpy.array(variablemask)
        variablemean = np.nanmean(variablemask)
        return variablemean
    
    
    def var(interest):
        task = functools.partial(process_poi, interest)
        return pool2.starmap(task, zip(xs, ys))
    
    
    def main():
        # This will create non-daemon processes so that these processes can create their own pools:
        with MyPool(2, init_pool) as pool:
            results = pool.map(var, [a, b])
            print(results)
    
    
    if __name__ == "__main__":
        main()
    

    第三种使用线程的解决方案

    使用asyncio

    import numpy
    from pysheds.grid import Grid
    from concurrent.futures import ThreadPoolExecutor
    import asyncio
    
    xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
    ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)
    
    file_list = [
        r'/home/test/image1.tif',
        r'/home/test/image2.tif'
    ]
    
    def process_point(interest, x, y):
        grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
        grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
        grid.clip_to('catch')
        grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
        variablemask = grid.view('variable', nodata=np.nan)
        variablemask = numpy.array(variablemask)
        variablemean = np.nanmean(variablemask)
        return variablemean
    
    
    async def var_loop_async(interest, pool, loop):
        tasks = [loop.run_in_executor(pool, process_point, interest, x, y) for (x, y) in zip(xs, ys)]
        return await asyncio.gather(*tasks)
    
    
    async def main():
        loop = asyncio.get_event_loop()
        with ThreadPoolExecutor(max_workers=100) as pool:
            tasks = [var_loop_async(file, pool, loop) for file in file_list]
            results = await asyncio.gather(*tasks)
            print(results)
    
    
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        try:
            loop.run_until_complete(main())
        finally:
            loop.close()
    

    替代方案:

    import numpy
    from pysheds.grid import Grid
    import functools
    from concurrent.futures import ThreadPoolExecutor
    
    
    xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
    ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306
    
    a = r'/home/test/image1.tif'
    b = r'/home/test/image2.tif'
    
    
    
    def process_poi(interest, x, y):
        grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
        grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
        grid.clip_to('catch')
        grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
        variablemask = grid.view('variable', nodata=np.nan)
        variablemask = numpy.array(variablemask)
        variablemean = np.nanmean(variablemask)
        return variablemean
    
    
    def var(executor, interest):
        return list(executor.map(functools.partial(process_poi, interest), xs, ys))
    
    
    def main():
        with ThreadPoolExecutor(max_workers=100) as executor:
            results = list(executor.map(functools.partial(var, executor), [a, b]))
            print(results)
    
    
    if __name__ == "__main__":
        main()
    

    使用基于 OP 更新代码的线程的更新解决方案

    import numpy
    from pysheds.grid import Grid
    import functools
    from concurrent.futures import ThreadPoolExecutor
    
    
    xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
    ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)
    
    file_list = (
        r'/home/test/image1.tif',
        r'/home/test/image2.tif'
    )
    
    
    def process_point(interest, x, y):
        grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
        grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
        grid.clip_to('catch')
        grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
        variablemask = grid.view('variable', nodata=np.nan)
        variablemask = numpy.array(variablemask)
        variablemean = np.nanmean(variablemask)
        return variablemean
    
    def var(executor, interest):
        return list(executor.map(functools.partial(process_point, interest), xs, ys))
    
    
    def main():
        with ThreadPoolExecutor(max_workers=100) as executor:
            results = list(executor.map(functools.partial(var, executor), file_list))
            print(results)
    
    
    if __name__ == "__main__":
        main()
    

    【讨论】:

    猜你喜欢
    • 2021-08-07
    • 2016-04-26
    • 2021-12-25
    • 2018-03-03
    • 1970-01-01
    • 2019-01-20
    • 2014-03-20
    • 1970-01-01
    • 2021-03-10
    相关资源
    最近更新 更多