【问题标题】:listen to multiple socket with websockets and asyncio使用 websockets 和 asyncio 监听多个套接字
【发布时间】:2018-04-16 13:02:35
【问题描述】:

我正在尝试在 python 中创建一个脚本,该脚本使用 websockets 和 asyncio 侦听多个套接字,问题是无论我做什么,它只听我调用的第一个套接字。 我认为它是无限循环,我有什么选择来解决这个问题?为每个套接字使用线程?

  async def start_socket(self, event):
    payload = json.dumps(event)
    loop = asyncio.get_event_loop()

    self.tasks.append(loop.create_task(
        self.subscribe(event)))

    # this should not block the rest of the code
    await asyncio.gather(*tasks)


  def test(self):
    # I want to be able to add corotines at a different time
    self.start_socket(event1)
    # some code
    self.start_socket(event2)

【问题讨论】:

    标签: python websocket python-asyncio


    【解决方案1】:

    这就是我最终所做的,这样它就不会阻塞主线程并且所有订阅都在并行工作。

    def subscribe(self, payload):
        ws = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
        ws.connect(url)
        ws.send(payload)
        while True:
            result = ws.recv()
            print("Received '%s'" % result)
    
        def start_thread(self, loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()
    
    def start_socket(self, **kwargs):
        worker_loop = asyncio.new_event_loop()
        worker = Thread(target=self.start_thread, args=(worker_loop,))
        worker.start()
    
        worker_loop.call_soon_threadsafe(self.subscribe, payload)
    
    
    def listen(self):
        self.start_socket(payload1)
        # code
        self.start_socket(payload2)
        # code
        self.start_socket(payload3)
    

    【讨论】:

      【解决方案2】:

      您的代码看起来不完整,但您显示的内容有两个问题。一个是run_until_complete 接受协程对象(或其他类型的未来),而不是协程函数。所以应该是:

      # note parentheses after your_async_function()
      asyncio.get_event_loop().run_until_complete(your_async_function())
      

      问题是,无论我做什么,它只听我调用的第一个套接字。我认为它是无限循环,我有什么选择来解决这个问题?为每个套接字使用线程?

      无限循环不是问题,asyncio 旨在支持这种“无限循环”。问题是你试图在一个协程中完成所有事情,而你应该为每个 websocket 创建一个协程。这不是问题,因为协程非常轻量级。

      例如(未经测试):

      async def subscribe_all(self, payload):
          loop = asyncio.get_event_loop()
          # create a task for each URL
          for url in url_list:
              tasks.append(loop.create_task(self.subscribe_one(url, payload)))
          # run all tasks in parallel
          await asyncio.gather(*tasks)
      
      async def subsribe_one(self, url, payload):
          async with websockets.connect(url) as websocket:
              await websocket.send(payload)
              while True:
                  msg = await websocket.recv()
                  print(msg)
      

      【讨论】:

      • await asyncio.gather(*tasks),给了我一个 RuntimeWarning: coroutine 'subscribe_all' is never awaited (the async function is not executed)....如果我使用 run_until_complete(gather(. .)) 但是它阻塞了线程和它没有执行后的代码
      • @joseRo 如果没有其余代码,很难判断发生了什么。你可能应该在等待来自另一个协程的subscribe_all...
      • @joseRo 谁打电话给test()
      • 另一个文件,直接导入调用即可。
      • @joseRo 我的意思是在 asyncio 的上下文中 - 调用者是 async def 还是普通函数?事件循环是否已经在运行,或者稍后会有人运行它?
      【解决方案3】:

      从 websocket 服务器高效监听多个 websocket 连接的一种方法是保留一个已连接客户端的列表,并基本上同时处理多个对话。

      例如一个简单的服务器,每隔几秒向每个连接的客户端发送随机 #:

      import os
      import asyncio
      import websockets
      import random 
      
      websocket_clients = set()
      
      async def handle_socket_connection(websocket, path):
          """Handles the whole lifecycle of each client's websocket connection."""
          websocket_clients.add(websocket)
          print(f'New connection from: {websocket.remote_address} ({len(websocket_clients)} total)')
          try:
              # This loop will keep listening on the socket until its closed. 
              async for raw_message in websocket:
                  print(f'Got: [{raw_message}] from socket [{id(websocket)}]')
          except websockets.exceptions.ConnectionClosedError as cce:
              pass
          finally:
              print(f'Disconnected from socket [{id(websocket)}]...')
              websocket_clients.remove(websocket)
      
      async def broadcast_random_number(loop):
          """Keeps sending a random # to each connected websocket client"""
          while True:
              for c in websocket_clients:
                  num = str(random.randint(10, 99))
                  print(f'Sending [{num}] to socket [{id(c)}]')
                  await c.send(num)
              await asyncio.sleep(2)
      
      if __name__ == "__main__":
          loop = asyncio.get_event_loop()
          try:
              socket_server = websockets.serve(handle_socket_connection, 'localhost', 6789)
              print(f'Started socket server: {socket_server} ...')
              loop.run_until_complete(socket_server)
              loop.run_until_complete(broadcast_random_number(loop))
              loop.run_forever()
          finally:
              loop.close()
              print(f"Successfully shutdown [{loop}].")
      

      一个连接到服务器并监听数字的简单客户端:

      import asyncio
      import random
      import websockets
      
      async def handle_message():
          uri = "ws://localhost:6789"
          async with websockets.connect(uri) as websocket:
              msg = 'Please send me a number...'
              print(f'Sending [{msg}] to [{websocket}]')
              await websocket.send(msg)
              while True:
                  got_back = await websocket.recv()
                  print(f"Got: {got_back}")
      
      asyncio.get_event_loop().run_until_complete(handle_message())
      

      混合线程和asyncio 麻烦大于其价值,并且您仍然有代码会阻塞网络IO 等最浪费的步骤(这是使用asyncio 的基本好处)。

      您需要在 事件循环 中异步运行每个 协程,使用 await 调用任何阻塞调用,并定义与任何 awaitable 与async 的交互

      查看工作,例如:https://github.com/adnantium/websocket_client_server

      【讨论】:

        猜你喜欢
        • 2014-07-18
        • 1970-01-01
        • 1970-01-01
        • 2017-10-17
        • 1970-01-01
        • 2011-02-15
        • 1970-01-01
        • 1970-01-01
        • 2021-12-05
        相关资源
        最近更新 更多