【问题标题】:How can I recall "task" if task is failed?如果任务失败,我如何召回“任务”?
【发布时间】:2021-06-05 15:29:02
【问题描述】:

函数make_request 向API 发出http 请求。而且我每秒最多只能发出 3 个请求。

我做过类似的事情

coroutines = [make_request(...) for ... in ...]
tasks = []
for coroutine in coroutines:
   tasks.append(asyncio.create_task(coroutine))
   await asyncio.sleep(1 / 3)
responses = asyncio.gather(*tasks)

但我每小时发出的请求也不能超过 1000 个。 (也许,我可以延迟3600 / 1000。)如果互联网连接丢失怎么办?我应该尝试再次提出请求。

我可以这样包装make_request

async def try_make_request(...):
   while True:
      try:
         return await make_request(...)
      exception APIError as err:
         logging.exception(...)

在这种情况下,每秒可能会发出超过 3 个请求。

我找到了that 解决方案,但我不确定这是不是最好的解决方案

pending = []
coroutines = [...]
for coroutine in coroutines:
    pending.append(asyncio.create_task(coroutine))
    await asyncio.sleep(1 / 3)
result = []
while True:
    finished, pending = await asyncio.wait(
        pending, return_when=asyncio.FIRST_EXCEPTION
    )
    for task in finished:
        exc = task.exception()
        if isinstance(exc, APIError) and exc.code == 29:
            pending.add(task.get_coro()) # since python 3.8
        if exc:
            logging.exception(...)
        else:
            result.append(task.result())
    if not pending:
        break

【问题讨论】:

  • 使用task.get_coro()的解决方案看起来很奇怪,它甚至可以工作吗?如果协程已经引发并因此完成,我认为您不能直接等待它,也不能通过新任务等待它。

标签: python python-3.x asynchronous async-await python-asyncio


【解决方案1】:

如果我正确理解该要求,则您启动连接的频率不得超过 3.6 秒。实现此目的的一种方法是设置一个计时器,该计时器在每次启动连接时重置,并在 3.6 秒后到期,从而允许启动下一个连接。例如:

class Limiter:
    def __init__(self, delay):
        self.delay = delay
        self._ready = asyncio.Event()
        self._ready.set()

    async def wait(self):
        # wait in a loop because if there are multiple waiters,
        # the wakeup can be spurious
        while not self._ready.is_set():
            await self._ready.wait()
        # We got the slot and can proceed with the download.
        # Before doing so, clear the ready flag to prevent other waiters
        # from commencing downloads until the delay elapses again.
        self._ready.clear()
        asyncio.get_event_loop().call_later(self.delay, self._ready.set)

那么try_make_request 可能看起来像这样:

async def try_make_request(limiter, ...):
    while True:
        await limiter.wait()
        try:
            return await make_request(...)
        exception APIError as err:
            logging.exception(...)

...主协程可以并行等待try_make_requests:

limiter = Limiter(3600/1000)
responses = await asyncio.gather(*[try_make_request(limiter, ...) for ... in ...])

【讨论】:

  • 谢谢!应该有self._ready.is_set() 而不是self._ready.get()
  • @RoyKing 是的,我现在修改了答案,谢谢。
猜你喜欢
  • 2011-07-20
  • 1970-01-01
  • 2011-06-15
  • 2017-10-18
  • 1970-01-01
  • 2021-04-20
  • 1970-01-01
  • 1970-01-01
  • 2017-01-30
相关资源
最近更新 更多