【问题标题】:Simple way to test websocket availability in python在 python 中测试 websocket 可用性的简单方法
【发布时间】:2018-10-07 19:09:28
【问题描述】:

我正在使用以下代码来测试本地 websocket 服务器是否正在运行:

import asyncio
import websockets

async def hello():
    async with websockets.connect('ws://127.0.0.1:8000/ws/registration/123') as websocket:
        await websocket.send(json_data)

asyncio.get_event_loop().run_until_complete(hello())

有没有更简单的方法可以在不使用 asyncio 的情况下做到这一点?比如:

import asyncio
import websockets

conn = websockets.connect('ws://127.0.0.1:8000/ws/registration/123')
conn.send('hello')

基本上,我只是想找到最简单的方法来测试我的 websocket 服务器是否正在侦听和接收特定 url 的消息。

【问题讨论】:

    标签: python websocket


    【解决方案1】:

    async_to_sync 不是让这更复杂吗?为什么不直接创建一个普通的test_url 函数:

    def test_url(url, data=""):
        async def inner():
            async with websockets.connect(url) as websocket:
                await websocket.send(data)
        return asyncio.get_event_loop().run_until_complete(inner())
    
    test_url("ws://127.0.0.1:8000/ws/registration/123")
    

    【讨论】:

    • 也许,我是使用 async、asncid 和 async_to_sync 的新手,所以现在开始学习。你的代码和我的代码有区别吗?还是您的只是一种更清洁的方法?
    • @David542 基本上这只是一种更清洁的方法。但无论如何你不需要依赖asgiref lib。需要明确的是,async_to_sync 只是这样的包装器。里面没有魔法。
    【解决方案2】:

    您可以使用async_to_sync 进行上述操作,例如:

    from asgiref.sync import async_to_sync
    import websockets
    
    def test_url(url, data=""):
        conn = async_to_sync(websockets.connect)(url)
        async_to_sync(conn.send)(data)
    
    
    test_url("ws://127.0.0.1:8000/ws/registration/123")
    

    请注意,“握手”可能不会在此处完成,因为它需要两种方式都接受,但以上内容应该使您能够测试以确保正确路由 url,等等。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多