【问题标题】:Python asyncio (aiohttp, aiofiles)Python asyncio (aiohttp, aiofiles)
【发布时间】:2018-06-16 19:11:17
【问题描述】:

我似乎很难理解 pythons asyncio。我没有编写任何代码,因为我看到的所有示例都是一次性运行的。创建一些协程,将它们添加到事件循环中,然后运行循环,它们运行在它们之间切换的任务,完成。这对我来说似乎没什么帮助。

我想使用 asyncio 来不中断我的应用程序中的操作(使用 pyqt5)。我想创建一些函数,当它们被调用时在 asyncio 事件循环中运行,然后当它们完成时它们会执行回调。

我想象的是。为 asyncio 创建一个单独的线程,创建循环并永远运行它。创建一些函数getFile(url, fp)get(url)readFile(file)等。然后在UI中,我有一个带有提交按钮的文本框,用户输入url,点击提交,它下载文件。

但是,我看到的每个示例,我都看不到如何将协程添加到正在运行的循环中。而且我不知道如何在不添加运行循环的情况下做我想做的事。

#!/bin/python3
import asyncio
import aiohttp
import threading

loop = asyncio.get_event_loop()

def async_in_thread(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def _get(url, callback):
    print("get: " + url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            result = await response.text()
            callback(result)
            return

def get(url, callback):
    asyncio.ensure_future(_get(url, callback))

thread = threading.Thread(target=async_in_thread, args=(loop, ))

thread.start()

def stop():
    loop.close()

def callme(data):
    print(data)
    stop()

get("http://google.com", callme)

thread.join()

这是我想象的,但它不起作用。

【问题讨论】:

  • asyncio.ensure_future() 会这样做。
  • @MartijnPieters OP 想要从不同的线程添加协程,ensure_future 并非设计为这样做的。
  • @user4815162342:很公平。我专注于“如何给异步事件循环一个新任务”部分。

标签: python python-asyncio


【解决方案1】:

要将协程添加到在不同线程中运行的循环中,请使用asyncio.run_coroutine_threadsafe

def get(url, callback):
    asyncio.run_coroutine_threadsafe(_get(url, callback))

通常,当您从运行它的线程外部与事件循环交互时,您必须通过run_coroutine_threadsafe(对于协程)或loop.call_soon_threadsafe(对于函数)运行所有内容。例如,要停止循环,请使用loop.call_soon_threadsafe(loop.stop)。另请注意,loop.close() 不得在循环回调中调用,因此您应该将该调用放在async_in_thread 中,紧跟在对run_forever() 的调用之后,此时循环肯定已停止运行。

asyncio 的另一件事是传递显式 when_done 回调并不习惯,因为 asyncio 公开了期货的概念(类似于 JavaScript 承诺),它允许将回调附加到尚不可用的结果。例如,可以这样写_get

async def _get(url):
    print("get: " + url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

它不需要callback 参数,因为任何相关方都可以使用loop.create_task 将其转换为任务,并使用add_done_callback 在任务完成时收到通知。例如:

def _get_with_callback(url, callback):
    loop = asyncio.get_event_loop()
    task = loop.create_task(_get(url))
    task.add_done_callback(lambda _fut: callback(task.result()))

在您的情况下,您不是直接处理任务,因为您的代码旨在与来自另一个线程的事件循环进行通信。但是,run_coroutine_threadsafe 返回一个非常有用的值 - 一个成熟的 concurrent.futures.Future,您可以使用它来注册完成的回调。除了接受 callback 参数外,您还可以将未来对象公开给调用者:

def get(url):
    return asyncio.run_coroutine_threadsafe(_get(url), loop)

现在调用者可以选择基于回调的方法:

future = get(url)
# call me when done
future.add_done_callback(some_callback)
# ... proceed with other work ...

或者,在适当的时候,他们甚至可以等待结果:

# give me the response, I'll wait for it
result = get(url).result()

后者按定义是阻塞的,但由于事件循环安全地运行在不同的线程中,它不受阻塞调用的影响。

【讨论】:

  • 这正是我想要做的。我只是有一个跟进,我现在如何正确关闭它?我摆脱了回调,我正在使用Future 对象,get 现在返回run_coroutine_threadsafedata = get(...) 然后我做data.result()(这一切都很好)。但是,当我在data.result() 下方执行loop.call_soon_threadsafe(loop.close) 时,我收到错误“无法关闭正在运行的事件循环”。我尝试像上面的get 一样包装loop.stop() 协程,但这也不起作用。
  • @Drew 因为loop.stop()不是协程,你应该使用loop.call_soon_threadsafe(loop.stop)。至于loop.close(),最好在loop.run_forever() 之后直接拨打async_in_thread。 (run_forever() 已完成证明 loop.stop() 已被处理并且循环不再运行。)
  • 好的,这是有道理的,因为run_forever() 正在阻塞,添加close() 什么都不做,然后在关闭应用程序之前的退出调用中我运行loop.call_soon_threadsafe(loop.stop) 然后它很好地关闭,效果很好.谢谢。
  • @Drew 在 GUI 程序的情况下,同样有效的选项是在开始时启动事件循环线程(daemon 设置为 true),并且从不打扰停止/关闭。关闭事件循环对于库(或测试运行程序等)创建的临时事件循环很重要,它用于释放循环本身持有的资源。在更典型的 asyncio 场景中,只有一个事件循环,它运行的时间与程序一样长,而在程序退出时是否调用 close() 并不重要。
【解决方案2】:

安装 QualMash 以平滑 Qt 和 asyncio 之间的集成。

项目自述文件中的示例为它的外观提供了灵感:

import sys
import asyncio
import time

from PyQt5.QtWidgets import QApplication, QProgressBar
from quamash import QEventLoop, QThreadExecutor

app = QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)  # NEW must set the event loop

progress = QProgressBar()
progress.setRange(0, 99)
progress.show()

async def master():
    await first_50()
    with QThreadExecutor(1) as exec:
        await loop.run_in_executor(exec, last_50)

async def first_50():
    for i in range(50):
        progress.setValue(i)
        await asyncio.sleep(.1)

def last_50():
    for i in range(50,100):
        loop.call_soon_threadsafe(progress.setValue, i)
        time.sleep(.1)

with loop: ## context manager calls .close() when loop completes, and releases all resources
    loop.run_until_complete(master())

【讨论】:

    猜你喜欢
    • 2018-12-26
    • 1970-01-01
    • 2019-08-06
    • 2018-02-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-17
    相关资源
    最近更新 更多