【发布时间】:2021-11-26 02:57:53
【问题描述】:
我需要在以下条件(行为)下使用 asyncio 来实现算法:
- 检查参数列表是否为空,如果为空则结束执行
- 从参数列表中弹出下一个参数创建协程
- 这个参数并安排它“同时”执行是不可能的
- 协程完成时执行的协程不超过 'async_level' 个
- 执行 -> 转到步骤 1
不必计划一次完成所有任务(如 asyncio.gather),而是分部分完成。当下一个任务完成执行时,一个新的任务会取代它。
我尝试使用 asyncio.as_completed() 来实现,但实际上并没有按预期工作:
async_level = 4
params_count = 10
params = [i for i in range(1, params_count + 1)]
tasks = {asyncio.create_task(job(param)) for param in params[0: async_level]}
params = iter(params[async_level:])
while True:
# NOTE: It wont work, because you can't add task in 'tasks' after 'as_completed' is invoked, so execution actually ends when the last coroutine in the 'as_completed' ends
for task in asyncio.as_completed(tasks):
print(f"len(tasks) = {len(tasks)}")
await task
try:
param = next(params)
tasks.add(asyncio.create_task(job(param)))
except StopIteration:
print("StopIteration")
break
另外,我尝试使用 asyncio.BoundedSemaphore 来实现它,但不满足前两个条件:
async_level = 4
params_count = 10
params = [i for i in range(1, params_count + 1)]
async def semaphore_job(name, _asyncio_semaphore):
async with _asyncio_semaphore:
await job(name)
asyncio_semaphore = asyncio.BoundedSemaphore(async_level)
jobs = []
# NOTE: This variant schedule all jobs at ones and it's significant drawback because the count of jobs can be overwhelmed
for param in params:
jobs.append(asyncio.ensure_future(semaphore_job(param, asyncio_semaphore)))
await asyncio.gather(*jobs)
如果您能提供任何帮助,我将不胜感激。
【问题讨论】:
-
我不明白您提出的信号量解决方案的问题究竟出在哪里。我知道它会提前安排所有工作,但大多数工作会在开始实际工作之前立即挂起信号量。任务是一个相对较小的 Python 对象,因此除非您拥有数百万个任务,否则它们不应该压倒 asyncio。您是否对该版本有特定问题,或者您是否试图通过不提前创建作业来优化内存使用?
-
不是数百万,而是数十万。我知道我可以使用 BoundedSemaphore 版本,但我不想提前安排所有任务,但只有在必要时才可以。使用数百兆字节的 RAM(任务 == 200 字节)来存储任务似乎很疯狂,我寻找了一个更优雅的解决方案。
-
很公平,我只是想检查一下“明显”的解决方案出了什么问题。
标签: python async-await python-asyncio