【问题标题】:Python aiohttp: cancel async execution on met conditionPython aiohttp:在满足条件时取消异步执行
【发布时间】:2019-01-22 07:39:54
【问题描述】:

我为一个看起来像这样的 CTF 游戏编写了一个异步蛮力脚本

async def bound_fetch(sem, session, answer):
    #  generating url, headers and json ...
    async with sem, session.post(url=url, json=json, headers=headers) as response:
        if response.status == 200:
            print('Right answer found: %s' % json['answer'])


async def run(words):
    tasks = []
    sem = asyncio.Semaphore(3)
    async with aiohttp.ClientSession() as session:
        for word in words:
            task = asyncio.create_task(bound_fetch(sem=sem, session=session, answer=''.join(word)))
            tasks.append(task)
        print("Generated %d possible answers. Checking %s" % (len(tasks), base_url))
        await asyncio.gather(*tasks)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(run(possible_answers))
    loop.run_until_complete(future)

我的参考是本教程:https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html

我想知道这是否是在 aiohttp 中执行此操作的正确方法,或者我是否让事情变得过于复杂(因为我不需要处理所有响应,只需知道哪个响应的状态为 200)?满足条件(状态码)如何取消处理?

【问题讨论】:

  • 好的,当满足条件时,我设法取消了raise StopIteration 的执行,而不是await asyncio.gather(*tasks) 我做await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)。如果有人知道更优雅的方式,请发布答案:)
  • 您可能想要提出除StopIteration 以外的其他内容——因为StopIteration 被Python 使用,所以将其用作业务异常并不是一个好主意。我发布了一个答案,展示了如何使用 Event 进行同步。

标签: python python-3.x python-asyncio aiohttp


【解决方案1】:

我想知道这是否是在 aiohttp 中执行此操作的正确方法

您的代码相当地道。在顶层,您可以省略 asyncio.ensure_future 并直接调用 asyncio.run(run(possible_answers))

满足条件(状态码)时如何取消处理?

您可以使用事件或未来对象并等待它,而不是使用gather。您可能知道,运行协程不需要gather(它们按照create_task 的调度运行),其明确目的是等待所有协程完成。基于Event 的同步可能如下所示:

async def bound_fetch(sem, session, answer, done):
    #  generating url, headers and json ...
    async with sem, session.post(url=url, json=json, headers=headers) as response:
        if response.status == 200:
            done.set()
            done.run_answer = json['answer']

async def run(words):
    sem = asyncio.Semaphore(3)
    done = asyncio.Event()
    async with aiohttp.ClientSession() as session:
        tasks = []
        for word in words:
            tasks.append(asyncio.create_task(bound_fetch(
                sem=sem, session=session, answer=''.join(word), done=done)))
        print("Generated %d possible answers. Checking %s" % (len(words), base_url))
        await done.wait()
        print('Right answer found: %s' % done.run_answer)
        for t in tasks:
            t.cancel()

【讨论】:

  • 非常感谢您的回复。这看起来确实更干净。我的代码编辑器抱怨在 asyncio.create_task() 之前没有等待,但我认为这是故意的,所以它不是同步的?顺便说一句,我不确定信号量是否有效——我在这里得到了很多联系。我可以以某种方式一次记录活动连接的数量吗?
  • 嗯,看起来程序以“RuntimeError: Session is closed 从未检索到任务异常”的方式终止。应该是这样吗?
  • @Phil 我认为您的 IDE 有点过分热心。尽管如此,由于您的第二句话,我修改了代码以在找到答案后取消任务。这应该可以解决这两个问题,因为现在使用了 create_task 的返回值,并且在退出主协程之前取消了任务。 (我怀疑asyncio.run 试图“完成”事件循环,导致剩余任务尝试对现已关闭的会话执行某些操作。)请检查这是否解决了问题。
  • 是的,它现在可以正常工作了。你能确认这里的信号量使用正确吗?我从 Pawel 的网站上复制了它,我知道 Dijkstra 在信号量背后的理论,但我不确定我是否正确使用它。例如:tutorialedge.net/python/concurrency/… auther 在信号量上由单个工作人员使用 acquire() 和 release() ,而我的代码中缺少这一点
  • @Phil 您对信号量的使用很好,因为async with semaphore 在进入with 块时自动执行await semaphore.acquire(),在退出时自动执行semaphore.release()。这比手动调用acquirerelease 更可靠,因为它会在出现异常或其他过早退出函数的情况下正确释放信号量。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-31
  • 2013-01-11
  • 1970-01-01
  • 2020-09-12
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多