这是一个使用asyncio streams 将stdin 回显到stdout 的示例(适用于Unix)。
import asyncio
import sys
async def connect_stdin_stdout():
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
return reader, writer
async def main():
reader, writer = await connect_stdin_stdout()
while True:
res = await reader.read(100)
if not res:
break
writer.write(res)
if __name__ == "__main__":
asyncio.run(main())
作为即用型解决方案,您可以使用aioconsole 库。它实现了类似的方法,但还为input、print、exec 和code.interact 提供了额外有用的异步等效项:
from aioconsole import get_standard_streams
async def main():
reader, writer = await get_standard_streams()
更新:
让我们试着弄清楚函数connect_stdin_stdout是如何工作的。
- 获取当前事件循环:
loop = asyncio.get_event_loop()
- 创建
StreamReader 实例。
reader = asyncio.StreamReader()
一般情况下,StreamReader/StreamWriter 类不打算直接实例化,而只能用作 open_connection() 和 start_server() 等函数的结果。
StreamReader 为某些数据流提供缓冲异步接口。一些源(库代码)调用它的函数如feed_data、feed_eof,数据被缓冲,可以使用documented接口协程read()、readline()等读取。
- 创建
StreamReaderProtocol 实例。
protocol = asyncio.StreamReaderProtocol(reader)
该类派生自asyncio.Protocol 和FlowControlMixin,有助于在Protocol 和StreamReader 之间进行调整。它覆盖Protocol 方法如data_received、eof_received 并调用StreamReader 方法feed_data。
- 在事件循环中注册标准输入流
stdin。
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
connect_read_pipe 函数将pipe 参数作为一个类似文件的对象。 stdin 是一个类似文件的对象。从现在开始,所有从stdin读取的数据都会落入StreamReaderProtocol,然后传入StreamReader
- 在事件循环中注册标准输出流
stdout。
w_transport, w_protocol = await loop.connect_write_pipe(FlowControlMixin, sys.stdout)
在connect_write_pipe 中,您需要传递一个协议工厂,该工厂创建协议实例,为StreamWriter.drain() 实现流控制逻辑。此逻辑在类FlowControlMixin 中实现。还有StreamReaderProtocol继承自它。
- 创建
StreamWriter 实例。
writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
该类将使用函数write()、writelines() 等传递给它的数据转发到底层transport。
protocol 用于支持drain() 函数等待底层传输已刷新其内部缓冲区并可再次写入。
reader是可选参数,可以是None,也用于支持drain()函数,在该函数开始时检查是否为阅读器设置了异常,例如,由于连接丢失(与套接字和双向连接相关),那么drain() 也会抛出异常。
您可以在这个伟大的answer 中阅读有关StreamWriter 和drain() 函数的更多信息。
更新 2:
读取带有\r\n分隔符的行可以使用readuntil