【问题标题】:Cancel pending asyncio tasks if one completes with desired result如果完成并获得所需结果,则取消挂起的异步任务
【发布时间】:2021-11-29 02:14:21
【问题描述】:

下面的示例纯粹是理论上的,用于传达我在这里想要实现的目标。

我有几个名字——杰克、爱丽丝、鲍勃

其中一个人有姓——墨菲

我可以进行网络调用来检索全名。一旦我找到姓“墨菲”的人,我就很高兴。


async def get_persons():
    persons = await asyncio.gather(
                       get_person("Jack"),
                       get_person("Alice"),
                       get_person("Bob"))

    for person in persons:
        if person.surname == "Murphy":
            return person
    

def main():
    person = asyncio.run(get_persons())
    print(f"{person.first_name} has surname {person.last_name}")

当然这里的问题是我们必须等待所有 3 个请求完成。

所以最短等待时间是所有 3 个请求的最长请求时间。

有 3 个网络请求。

假设第一个需要 3 秒,第二个需要 1 秒,第三个需要 6 秒。

运行此代码需要 6 秒。

但是我们看到第二个请求(Alice)的姓氏是 Murphy,并且显然在 1 秒后完成。

我们可以基本上忽略其他两个网络请求,并在此时返回吗?

所以最终,整个过程需要 1 秒,而不是 6 秒。

编辑:

(代码更新以反映 Ajax1234 的解决方案)

class Persons:

    def __init__(self):
        self.p = []

    def get_person_request(self, name):
        if name == "Alice":
            print("Searching Alice")
            time.sleep(6)
            print("Returning Alice")
            return {'firstname': "Alice", 'surname': "Donnelly"}
        if name == "Bob":
            print("Searching Bob")
            time.sleep(3)
            print("Returning Bob")
            return {'firstname': "Bob", 'surname': "Murphy"}
        if name == "Jack":
            print("Searching Jack")
            time.sleep(8)
            print("Returning Jack")
            return {'firstname': "Jack", 'surname': "Connell"}
        return None

    async def get_person(self, n, _id):
        # the process for checking if the request response returns a person with the target surname
        if (person := self.get_person_request(n))["surname"] == "Murphy":
            for i, a in self.p:
                if i != _id:
                    a.cancel()
        return person

    async def get_persons(self, names):
        print("Setting tasks...")
        self.p = [(i, asyncio.create_task(self.get_person(a, i)))
                  for i, a in enumerate(names)]
        print("Gathering async results...")
        persons = await asyncio.gather(*[a for _, a in self.p])
        return [person for person in persons if isinstance(person, dict)][0]

def test():
    val = asyncio.run(Persons().get_persons(['Bob', 'Alice', 'Jack']))
    print(val)

脚本的输出看起来像

Setting tasks...
Gathering async results...
Searching Bob
Returning Bob
asyncio.exceptions.CancelledError

我希望输出看起来像

Setting tasks...
Gathering async results...
Searching Bob
Searching Alice
Searching Jack
Returning Bob
{'firstname': 'Bob', 'surname': 'Murphy'}

这里有两个问题:

  • 为什么每个 get_person 任务没有异步运行?
  • 如何处理gather() 不允许取消任务的异常?

【问题讨论】:

  • 如果您只是等待gather(),他们甚至会在您参加测试之前返回。但是,您可以将get_person 包装在一个函数中,该函数在找到肯定结果时设置一个标志,并在您的 main 循环中检查它,在设置标志时取消剩余的任务。
  • 我们如何取消剩余的任务?
  • asyncio.as_completed 能满足您的需求吗?

标签: python async-await python-asyncio


【解决方案1】:

您可以使用asyncio.create_task 来生成并行运行的可取消任务。你可以把这些任务存成一个列表,然后当get_person记录了"Murphy"的姓氏时,其余的都可以取消。

根据您的完整示例编辑解决方案:

import asyncio, time
class Persons:
   def __init__(self):
      self.p = []
   async def get_person_request(self, name):
     if name == "Alice":
         print("Searching Alice")
         await asyncio.sleep(6)
         print("Returning Alice")
         return {'firstname': "Alice", 'surname': "Donnelly"}
     if name == "Bob":
         print("Searching Bob")
         await asyncio.sleep(1)
         print("Returning Bob")
         return {'firstname': "Bob", 'surname': "Murphy"}
     if name == "Jack":
         print("Searching Jack")
         await asyncio.sleep(3)
         print("Returning Jack")
         return {'firstname': "Jack", 'surname': "Connell"}
   async def get_person(self, n, _id):
      if (person:=await self.get_person_request(n))["surname"]  == "Murphy": #the process for checking if the request response returns a person with the target surname
         for i, a in self.p:
            if i != _id:
               a.cancel()
      return person
   async def get_persons(self, names):
      self.p = [(i, asyncio.create_task(self.get_person(a, i))) for i, a in enumerate(names)]
      return await asyncio.gather(*[a for _, a in self.p], return_exceptions=True)

t = time.time()
asyncio.run(Persons().get_persons(['Jack', 'Alice', 'Bob']))
print(time.time() - t)

输出:

1.0074191093444824 #taking ~1 second to produce the desired result, as expected

【讨论】:

  • 您好 Ajax1234,我尝试了您的解决方案。我认为它已经接近了,你能看到我的问题的编辑以提供一些关于我所看到的后续细节吗?知道那里出了什么问题
  • @GregPeckory 请查看我最近的编辑,因为我做了一些更改以反映您最近的代码。最终,您必须使用 asyncio.sleep 来反映非阻塞、异步网络请求的行为方式,而不是使用 time.sleep,后者会阻塞事件循环。另外,我抑制了来自asyncio.gather 的异常,因此来自get_persons 的结果将返回带有目标姓氏和已取消任务的字典。我使用您的原始基准对这个过程进行了计时,该过程在大约 1 秒内完成
  • 太棒了,非常感谢!
【解决方案2】:

这是一个简单的示例,说明您可以做什么。当然,在这种情况下,没有实际的服务请求,只是睡眠 - 但无论如何.cancel() 应该可以工作。

from random import choice, randint
from datetime import datetime
import asyncio


async def retrieve_person():
    # this just generates a random first and last name combo
    first = choice(['Alice', 'Bob', 'Charlie', 'Dave'])
    # giving 'Murphy' a decent chance of showing up
    last = choice(['Baker', 'Murphy', 'Smith', 'Murphy'])
    # anywhere between 3 and 8 seconds for each 'request'
    duration = randint(3, 8)
    print(f'Taking {duration} seconds to get {first} {last}')
    await asyncio.sleep(duration)
    return {'first': first, 'last': last}


async def main():
    # kick off all the asynchronous tasks, without knowing which will finish
    # first and whether any of them will get us a result we actually need.
    aws = [
        asyncio.create_task(retrieve_person()),
        asyncio.create_task(retrieve_person()),
        asyncio.create_task(retrieve_person())
    ]
    print(f'Starting {datetime.now()}')
    person = None
    for coro in asyncio.as_completed(aws):
        person = await coro
        if person['last'] == 'Murphy':
            # cancel the rest and stop looping
            for other_coro in aws:
                other_coro.cancel()
            break
        else:
            person = None
    print(f'Done {datetime.now()}: {person}')


asyncio.run(main())

如果你不幸看到一个带有“墨菲”的清晰示例,你可能需要运行几次,我没有努力总是包含一个,但代码确实表明,如果没有正面,最后一个请求只会给你一个None

示例输出:

Starting 2021-10-10 14:23:26.764063
Taking 8 seconds to get Alice Murphy
Taking 6 seconds to get Bob Murphy
Taking 3 seconds to get Alice Baker
Done 2021-10-10 14:23:32.762239: {'first': 'Bob', 'last': 'Murphy'}

请注意,在此示例中,可能有两个匹配项,但 Bob 在大约 6 秒后胜出,程序完成,从未到达 Alice。

【讨论】:

    猜你喜欢
    • 2013-12-27
    • 2015-08-05
    • 1970-01-01
    • 2021-09-09
    • 1970-01-01
    • 1970-01-01
    • 2020-04-25
    • 1970-01-01
    • 2016-02-23
    相关资源
    最近更新 更多