首先,在您的原始代码中,我看到了:
for (x,y) in zip(xs,ys):
grid = Grid.from_raster(interest, data_name='map')
我不熟悉pysheds 模块,也找不到任何关于它的文档,所以我不知道Grid.from_raster 是否是一项昂贵的操作。但似乎这条语句是在for 循环之上移动而不是在循环中重新计算的候选者。也许仅此一项就能获得显着的性能改进。您提到的链接What all parameters to be called in a async function in python? 表明,创建进程池的开销可能不足以弥补麻烦。此外,如果Grid.from_raster 很昂贵并且通过将其从循环中移除来获利,那么多处理解决方案本质上是“将其放回循环中”,通过使其对每个 x、y 对执行,从而减少多处理解决方案可能会导致性能提升。
无论如何,要使用建议的技术运行您的代码,请参见下文。不幸的是,您不能在处理器池中同时运行process_poi 和var_loop_async。但请在下面进一步寻找不同的解决方案。
import numpy
from pysheds.grid import Grid
from concurrent.futures.process import ProcessPoolExecutor
import asyncio
xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)
file_list = (
r'/home/test/image1.tif',
r'/home/test/image2.tif'
)
def process_point(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean
async def var_loop_async(interest, pool, loop):
tasks = [loop.run_in_executor(pool, process_point, interest, x, y) for (x, y) in zip(xs, ys)]
return await asyncio.gather(*tasks)
async def main():
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
tasks = [var_loop_async(file, pool, loop) for file in file_list]
results = await asyncio.gather(*tasks)
print(results)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
不同的解决方案
您希望能够在进程池中为要处理的每个文件运行var,然后在子进程中处理每个 x、y 对。这意味着您需要每个处理文件的子进程都有自己的进程池来处理 x、y 对。这通常是不可能的,因为为进程池创建的进程是 daemon 进程(它们会在主进程终止时自动终止)并且不允许创建自己的子进程。为了克服这个问题,我们必须创建自己的 mutliprocessor.Pool 特化,并使用自己的池初始化每个子进程。
但这会提高性能吗?var 子进程基本上什么都不做,除了等待process_poi 子进程完成它们的工作。所以我不希望这对以前的代码有很大的改进。而且,正如我所提到的,尚不清楚这两种多处理解决方案是否会比原始代码有所改进,尤其是经过修改以重新定位 Grid.from_raster 调用的代码。
import numpy
from pysheds.grid import Grid
import functools
from multiprocessing.pool import Pool
import multiprocessing
import os
# This allows subprocesses to have their own pools:
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
class NoDaemonContext(type(multiprocessing.get_context())):
Process = NoDaemonProcess
class MyPool(multiprocessing.pool.Pool):
def __init__(self, *args, **kwargs):
kwargs['context'] = NoDaemonContext()
super(MyPool, self).__init__(*args, **kwargs)
xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306
a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'
pool2 = None
def init_pool():
global pool2
#pool2 = Pool(5)
pool2 = Pool(os.cpu_count // 2) # half the available number of processors
def process_poi(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean
def var(interest):
task = functools.partial(process_poi, interest)
return pool2.starmap(task, zip(xs, ys))
def main():
# This will create non-daemon processes so that these processes can create their own pools:
with MyPool(2, init_pool) as pool:
results = pool.map(var, [a, b])
print(results)
if __name__ == "__main__":
main()
第三种使用线程的解决方案
使用asyncio:
import numpy
from pysheds.grid import Grid
from concurrent.futures import ThreadPoolExecutor
import asyncio
xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)
file_list = [
r'/home/test/image1.tif',
r'/home/test/image2.tif'
]
def process_point(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean
async def var_loop_async(interest, pool, loop):
tasks = [loop.run_in_executor(pool, process_point, interest, x, y) for (x, y) in zip(xs, ys)]
return await asyncio.gather(*tasks)
async def main():
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=100) as pool:
tasks = [var_loop_async(file, pool, loop) for file in file_list]
results = await asyncio.gather(*tasks)
print(results)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
替代方案:
import numpy
from pysheds.grid import Grid
import functools
from concurrent.futures import ThreadPoolExecutor
xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306
a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'
def process_poi(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean
def var(executor, interest):
return list(executor.map(functools.partial(process_poi, interest), xs, ys))
def main():
with ThreadPoolExecutor(max_workers=100) as executor:
results = list(executor.map(functools.partial(var, executor), [a, b]))
print(results)
if __name__ == "__main__":
main()
使用基于 OP 更新代码的线程的更新解决方案
import numpy
from pysheds.grid import Grid
import functools
from concurrent.futures import ThreadPoolExecutor
xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)
file_list = (
r'/home/test/image1.tif',
r'/home/test/image2.tif'
)
def process_point(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean
def var(executor, interest):
return list(executor.map(functools.partial(process_point, interest), xs, ys))
def main():
with ThreadPoolExecutor(max_workers=100) as executor:
results = list(executor.map(functools.partial(var, executor), file_list))
print(results)
if __name__ == "__main__":
main()