【问题标题】:python asyncio how to read StdIn and write to StdOut?python asyncio如何读取StdIn并写入StdOut?
【发布时间】:2020-10-11 11:43:29
【问题描述】:

我需要异步读取 StdIn 以获取消息(json 由 \r\n 终止)并在处理完异步后将更新的消息写入 StdOut。

目前我正在同步进行:

class SyncIOStdInOut():
    def write(self, payload: str):
        sys.stdout.write(payload)
        sys.stdout.write('\r\n')
        sys.stdout.flush()

    def read(self) -> str:
        payload=sys.stdin.readline()
        return  payload

如何异步执行相同的操作?

【问题讨论】:

    标签: python python-asyncio stdout stdin


    【解决方案1】:

    这是一个使用asyncio streamsstdin 回显到stdout 的示例(适用于Unix)。

    import asyncio
    import sys
    
    
    async def connect_stdin_stdout():
        loop = asyncio.get_event_loop()
        reader = asyncio.StreamReader()
        protocol = asyncio.StreamReaderProtocol(reader)
        await loop.connect_read_pipe(lambda: protocol, sys.stdin)
        w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
        writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
        return reader, writer
    
    
    async def main():
        reader, writer = await connect_stdin_stdout()
        while True:
            res = await reader.read(100)
            if not res:
                break
            writer.write(res)
    
    
    if __name__ == "__main__":
        asyncio.run(main())
    

    作为即用型解决方案,您可以使用aioconsole 库。它实现了类似的方法,但还为inputprintexeccode.interact 提供了额外有用的异步等效项:

    from aioconsole import get_standard_streams
    
    async def main():
        reader, writer = await get_standard_streams()
    

    更新:

    让我们试着弄清楚函数connect_stdin_stdout是如何工作的。

    1. 获取当前事件循环:
    loop = asyncio.get_event_loop()
    
    1. 创建StreamReader 实例。
    reader = asyncio.StreamReader()
    

    一般情况下,StreamReader/StreamWriter 类不打算直接实例化,而只能用作 open_connection()start_server() 等函数的结果。 StreamReader 为某些数据流提供缓冲异步接口。一些源(库代码)调用它的函数如feed_datafeed_eof,数据被缓冲,可以使用documented接口协程read()readline()等读取。

    1. 创建StreamReaderProtocol 实例。
    protocol = asyncio.StreamReaderProtocol(reader)
    

    该类派生自asyncio.ProtocolFlowControlMixin,有助于在ProtocolStreamReader 之间进行调整。它覆盖Protocol 方法如data_receivedeof_received 并调用StreamReader 方法feed_data

    1. 在事件循环中注册标准输入流stdin
    await loop.connect_read_pipe(lambda: protocol, sys.stdin)
    

    connect_read_pipe 函数将pipe 参数作为一个类似文件的对象。 stdin 是一个类似文件的对象。从现在开始,所有从stdin读取的数据都会落入StreamReaderProtocol,然后传入StreamReader

    1. 在事件循环中注册标准输出流stdout
    w_transport, w_protocol = await loop.connect_write_pipe(FlowControlMixin, sys.stdout)
    

    connect_write_pipe 中,您需要传递一个协议工厂,该工厂创建协议实例,为StreamWriter.drain() 实现流控制逻辑。此逻辑在类FlowControlMixin 中实现。还有StreamReaderProtocol继承自它。

    1. 创建StreamWriter 实例。
    writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
    

    该类将使用函数write()writelines() 等传递给它的数据转发到底层transport

    protocol 用于支持drain() 函数等待底层传输已刷新其内部缓冲区并可再次写入。

    reader是可选参数,可以是None,也用于支持drain()函数,在该函数开始时检查是否为阅读器设置了异常,例如,由于连接丢失(与套接字和双向连接相关),那么drain() 也会抛出异常。

    您可以在这个伟大的answer 中阅读有关StreamWriterdrain() 函数的更多信息。

    更新 2:

    读取带有\r\n分隔符的行可以使用readuntil

    【讨论】:

    • 你能解释一下这里的概念吗?比如reader、protocol、connect_read_pipe、w_transport、connect_write_pipe的用途是什么?所有这些元素的目的是什么以及它们如何与 conenct_read/write_pipe 一起使用?为什么需要 dummy 作为协议?为什么 writer 使用与 reader 相同的协议?我相信对于阅读行我应该使用 reader.readline()。如何将行终止设置为 \r\n 以进行读写?如您所见,我很困惑,但是您的解决方案有效。非常感谢您的回答。
    • 另外,我刚刚注意到没有等待作家。
    • 你好,我稍后会尝试更详细地描述一下
    • 提前致谢。
    • 这是我来的最远的:pastebin.com/2Rwch6Dr
    【解决方案2】:

    这是您可以从标准输入异步读取的另一种方式(一次读取一行)。

    async def async_read_stdin()->str:
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, sys.stdin.readline)
    

    【讨论】:

    • 谢谢,但我更喜欢@alex_noname 的建议。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-03-08
    • 2019-11-13
    • 1970-01-01
    • 2012-11-16
    • 2011-05-05
    • 2017-08-16
    • 1970-01-01
    相关资源
    最近更新 更多