【问题标题】:Custom asyncio executor自定义异步执行器
【发布时间】:2021-11-26 02:57:53
【问题描述】:

我需要在以下条件(行为)下使用 asyncio 来实现算法:

  1. 检查参数列表是否为空,如果为空则结束执行
  2. 从参数列表中弹出下一个参数创建协程
  3. 这个参数并安排它“同时”执行是不可能的
  4. 协程完成时执行的协程不超过 'async_level' 个
  5. 执行 -> 转到步骤 1

不必计划一次完成所有任务(如 asyncio.gather),而是分部分完成。当下一个任务完成执行时,一个新的任务会取代它。

我尝试使用 asyncio.as_completed() 来实现,但实际上并没有按预期工作:

async_level = 4
params_count = 10
params = [i for i in range(1, params_count + 1)]

tasks = {asyncio.create_task(job(param)) for param in params[0: async_level]}
params = iter(params[async_level:])

while True:
    # NOTE: It wont work, because you can't add task in 'tasks' after 'as_completed' is invoked, so execution actually ends when the last coroutine in the 'as_completed' ends
    for task in asyncio.as_completed(tasks):
        print(f"len(tasks) = {len(tasks)}")
        await task

        try:
            param = next(params)
            tasks.add(asyncio.create_task(job(param)))
        except StopIteration:
            print("StopIteration")

    break

另外,我尝试使用 asyncio.BoundedSemaphore 来实现它,但不满足前两个条件:

async_level = 4
params_count = 10
params = [i for i in range(1, params_count + 1)]

async def semaphore_job(name, _asyncio_semaphore):
    async with _asyncio_semaphore:
        await job(name)

asyncio_semaphore = asyncio.BoundedSemaphore(async_level)
jobs = []
# NOTE: This variant schedule all jobs at ones and it's significant drawback because the count of jobs can be overwhelmed
for param in params:
    jobs.append(asyncio.ensure_future(semaphore_job(param, asyncio_semaphore)))
await asyncio.gather(*jobs)

如果您能提供任何帮助,我将不胜感激。

【问题讨论】:

  • 我不明白您提出的信号量解决方案的问题究竟出在哪里。我知道它会提前安排所有工作,但大多数工作会在开始实际工作之前立即挂起信号量。任务是一个相对较小的 Python 对象,因此除非您拥有数百万个任务,否则它们不应该压倒 asyncio。您是否对该版本有特定问题,或者您是否试图通过不提前创建作业来优化内存使用?
  • 不是数百万,而是数十万。我知道我可以使用 BoundedSemaphore 版本,但我不想提前安排所有任务,但只有在必要时才可以。使用数百兆字节的 RAM(任务 == 200 字节)来存储任务似乎很疯狂,我寻找了一个更优雅的解决方案。
  • 很公平,我只是想检查一下“明显”的解决方案出了什么问题。

标签: python async-await python-asyncio


【解决方案1】:

看来我自己找到了解决方案:

import asyncio
from typing import Callable
from random import randrange
from asyncio import Semaphore, ensure_future, get_event_loop


async def job(name, time_range=10):
    timeout = randrange(time_range)
    print(f"Task '{name}' started with timeout {timeout}")
    await asyncio.sleep(timeout)
    print(f"Task '{name}' finished")
    return name


async def custom_executor(func: Callable, args: list, async_level: int = 4):
    """ Asynchronously executes no more that 'async_level' callables specified by 'func' with corresponding 'args' """
    loop = get_event_loop()
    sync = Semaphore()
    todo = set(args)
    doing = set()

    def _schedule_task():
        if todo:
            arg = todo.pop()
            fr = func(*arg) if isinstance(arg, (tuple, list, set)) else func(arg)
            f = ensure_future(fr, loop=loop)
            f.add_done_callback(_on_completion)
            doing.add(f)

    def _on_completion(f):
        doing.remove(f)
        sync.release()
        _schedule_task()

    for _ in range(min(async_level, len(todo))):
        _schedule_task()

    while True:
        if not doing:
            break

        await sync.acquire()


async def main():
    await custom_executor(job, [(1, 3), 7, (8, 2), 12, 5])

if __name__ == '__main__':
    asyncio.run(main())

但如果你知道更好的方法,请分享!

【讨论】:

    【解决方案2】:

    您可以创建固定数量的工作人员并使用队列为他们分配任务。它有点短,我发现它比使用回调的代码更容易推理。但是YMMV。

    async def custom_executor(func, args, async_level=4):
        queue = asyncio.Queue(1)
        async def worker():
            while True:
                arg = await queue.get()
                fr = func(*arg) if isinstance(arg, (tuple, list, set)) else func(arg)
                await fr
                queue.task_done()
    
        # create the workers
        workers = [asyncio.create_task(worker()) for _ in range(async_level)]
    
        # Feed the workers tasks. Since the queue is bounded, this will also
        # wait for previous tasks to finish, similar to what you wanted to
        # achieve with as_completed().
        for x in args:
            await queue.put(x)
        await queue.join()  # wait for the remaining tasks to finish
    
        # cancel the now-idle workers
        for w in workers:
            w.cancel()
    

    【讨论】:

      猜你喜欢
      • 2020-11-27
      • 1970-01-01
      • 2020-01-03
      • 1970-01-01
      • 1970-01-01
      • 2019-03-11
      • 1970-01-01
      • 1970-01-01
      • 2020-03-06
      相关资源
      最近更新 更多