【问题标题】:asyncio server fails to write to the client via a loop异步服务器无法通过循环写入客户端
【发布时间】:2021-12-01 17:47:47
【问题描述】:

提供了一个 MWE 来演示该问题。当使用 asyncio 在循环中向客户端发送字节时,服务器将在第一次迭代时发送,但随后的迭代它将简单地缓冲字节并且从不发送。客户端只会等待更多字节,但由于某种原因字节没有到达。如果服务器程序用 SIGINT (ctrl+c) 关闭,服务器会一次性将缓冲区全部转储到客户端。

import asyncio
import socket
from time import sleep
import sys

class server_protocol(asyncio.Protocol):

    def __init__(self):
        self.connection_closed = True

    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport
        self.write_counter = 0
        self.connection_closed = False
        
    def write_loop(self):
        while not self.connection_closed:
            self.write_counter+=1
            print(self.write_counter,'Writing bytes, Buffer Size:',self.transport.get_write_buffer_size())
            self.transport.write(b'AhXTMkHrJHdExaKBLmkvTnvRduENcusjnRnrBAHtjnUMtdjxsnKgDRtpDMjncFczrqwjrSrVNwxtBSmmLJnFAfkgbDwEuBAcdVMCLVeMuSXfxyYRdaNvvEhEFnGWNNtk')
            sleep(1)
        self.transport.close()
        
    def data_received(self,data):
        self.write_loop()
        
    def connection_lost(self,exc):
        print("Connection lost")
        self.connection_closed = True
        
    def eof_received(self):
        print("EOF received")
        self.connection_closed = True
        
def client(port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_address = ('localhost', port)
    print('connecting to {}:{}'.format(*server_address))
    sock.connect(server_address)
    try:
        # Send data
        message = b'start'
        print('sending {!r}'.format(message))
        sock.sendall(message)
        while True:
            print('Receiving')
            data = sock.recv(256)
            print('Received {!r}'.format(data))
            if len(data) == 0:
                break

    finally:
        print('closing socket')
        sock.close()
        
async def main(type,port):
    if type == 'server':
        loop = asyncio.get_running_loop()
        server = await loop.create_server(lambda: server_protocol(),'localhost', port)
        async with server:
            await server.serve_forever()
    elif type == 'client':
        client(port)
        
if __name__ == '__main__':
    if len(sys.argv) == 3:
        asyncio.run(main(sys.argv[1],int(sys.argv[2])))
    else:
        print('provide argument \'server\' or \'client\' and port')
        

【问题讨论】:

    标签: python asynchronous python-asyncio


    【解决方案1】:

    编辑:根据dirn 的评论更新答案

    问题在于write_loop。它需要是一个async 方法,使用asyncio.ensure_future 运行,sleep 需要是asyncio.sleep。原始示例有 write_loop 阻止事件循环能够运行其他任何东西。

    更正的 MWE:

    import asyncio
    import socket
    import sys
    
    class server_protocol(asyncio.Protocol):
    
        def __init__(self):
            self.connection_closed = True
    
        def connection_made(self, transport):
            peername = transport.get_extra_info('peername')
            print('Connection from {}'.format(peername))
            self.transport = transport
            self.write_counter = 0
            self.connection_closed = False
            
        async def write_loop(self):
            while not self.connection_closed:
                self.write_counter+=1
                print(self.write_counter,'Writing bytes, Buffer Size:',self.transport.get_write_buffer_size())
                self.transport.write(b'AhXTMkHrJHdExaKBLmkvTnvRduENcusjnRnrBAHtjnUMtdjxsnKgDRtpDMjncFczrqwjrSrVNwxtBSmmLJnFAfkgbDwEuBAcdVMCLVeMuSXfxyYRdaNvvEhEFnGWNNtk')
                await asyncio.sleep(1)
            self.transport.close()
            
        def data_received(self,data):
            asyncio.ensure_future(self.write_loop())
            
        def connection_lost(self,exc):
            print("Connection lost")
            self.connection_closed = True
            
        def eof_received(self):
            print("EOF received")
            self.connection_closed = True
            
    def client(port):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server_address = ('localhost', port)
        print('connecting to {}:{}'.format(*server_address))
        sock.connect(server_address)
        try:
            # Send data
            message = b'start'
            print('sending {!r}'.format(message))
            sock.sendall(message)
            while True:
                print('Receiving')
                data = sock.recv(256)
                print('Received {!r}'.format(data))
                if len(data) == 0:
                    break
    
        finally:
            print('closing socket')
            sock.close()
            
    async def main(type,port):
        if type == 'server':
            loop = asyncio.get_running_loop()
            server = await loop.create_server(lambda: server_protocol(),'localhost', port)
            async with server:
                await server.serve_forever()
        elif type == 'client':
            client(port)
            
    if __name__ == '__main__':
        if len(sys.argv) == 3:
            asyncio.run(main(sys.argv[1],int(sys.argv[2])))
        else:
            print('provide argument \'server\' or \'client\' and port')
            
    

    【讨论】:

    • sleep 不是同步的,而是write_loop 是同步的(在 OP 中)。如果没有办法让它将控制权交还给事件循环,其他任何东西都不会有机会运行。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-08-20
    • 1970-01-01
    • 1970-01-01
    • 2011-04-28
    • 2018-06-30
    • 2021-04-16
    相关资源
    最近更新 更多