【问题标题】:Sending and receiving frames over the same websocket connection without blocking通过同一个 websocket 连接发送和接收帧而不阻塞
【发布时间】:2018-10-04 17:53:44
【问题描述】:

抱歉,这篇文章很长,但我已经研究了一个多星期,所以我尝试了很多不同的东西。我对 Python 非常了解,但我对 Python 中的异步或非阻塞函数没有任何经验。

我正在为需要 websocket 连接的 web 服务编写一个 API 库/模块/包/任何东西。有许多传入的消息需要处理,以及一些与控制相关的(Web 应用程序级别,而不是 websocket 控制消息),我有时需要发送这些消息。我可以轻松地通过连接接收消息并对其采取行动。我可以发送消息,但只能响应收到的消息,因为接收循环总是阻塞等待消息。我不想等待传入消息来处理传出消息,因此脚本不必挂在输入上,直到收到新消息。在我努力让双向通信按预期工作时,我发现我需要使用 Twisted、Tornado 或 asyncio 之类的东西,但到目前为止,我尝试过的每个实现都失败了。请注意,发送必须通过同一连接进行。在接收循环之外打开一个短暂的连接将不起作用。这是我到目前为止所做的:


websocket 代码的第一次迭代是使用websocket-client 包。它非常接近文档中的示例:

import websocket
try:
    import thread
except ImportError:
    import _thread as thread
import time

def on_message(ws, message):
    # Send message frames to respective functions
    # for sorting, objectification, and processing

def on_error(ws, error):
    print(error)

def on_close(ws):
    print("### closed ###")

def on_open(ws):
    def run(*args):
        # Send initial frames required for server to send the desired frames
    thread.start_new_thread(run, ())


if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp(buildWebsocketURL()),
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)
    ws.on_open = on_open
    ws.run_forever()

这会阻止循环外的任何进一步执行。我尝试学习 _thread 模块,但找不到任何迹象表明我可以从外部与 websocket 线程“通信”。我尝试设置一个 pub/sub 侦听器函数,将数据从另一个发送器函数转发到 ws.send(),但它不起作用。没有错误或任何东西,只是没有任何已发送消息的指示。


接下来我尝试了Websockets 模块。这似乎是从头开始构建以利用 asyncio。同样,我得到了一个客户端构建,它会发送初始消息并根据收到的消息采取行动,但进度停在那里:

async def wsconnection():
        async with websockets.connect(getWebsocketURL()) as websocket:
            while True:
                message = await websocket.recv()
                if message == '{"type":"broadcaster.ready"}':
                    subscriptions = getSubscriptions()  # Get subscriptions from ident data
                    logging.info('Sending bookmarks to server as subscription keys')
                    subscriptionupdate = '{{"type": "subscribe","subscription_keys": ["{0}"],"subscription_scope": "update"}}'.format(
                        '","'.join(subscriptions))
                    subscriptioncontent = '{{"subscription_keys": ["{0}"],"subscription_scope": "content","type": "subscribe"}}'.format(
                        '","'.join(subscriptions))
                    logging.debug(subscriptioncontent)
                    await websocket.send(subscriptionupdate)
                    await websocket.send(subscriptioncontent)
                    await websocket.send(
                        '{"type":"message_lobby.read","lobby_id":"1","message_id:"16256829"}')
                sortframe(message) 

asyncio.get_event_loop().run_until_complete(wsconnection())

我尝试了前面提到的 pub/sub 监听器,但无济于事。在更彻底地阅读了这个模块的文档后,我尝试在循环之外获取 websocket 协议对象(包含 send() 和 recv() 方法),然后创建两个协程(?),一个监听传入消息,一个监听并发送传出消息。到目前为止,如果不在 wsconnection() 函数的范围内运行 async with websockets.connect(getWebsocketURL()) as websocket: 行,我完全无法获取 websocket 协议对象。我尝试使用websocket = websockets.client.connect(),根据docs,我认为会设置我需要的协议对象,但事实并非如此。在没有广泛的 asyncio 知识的情况下,我能找到的所有示例似乎都没有揭示以我需要的方式构建 websockets 发送器和接收器的任何明显方法。


我还使用 asyncio 和 Twisted 使用了与上述类似的代码结构的 autobahn,但我遇到了与上述相同的所有问题。

到目前为止,我得到的最接近的是上面的 Websockets 包。文档中有一个用于发送/接收连接的example snippet,但我无法真正阅读那里发生的事情,因为它都非常特定于 asyncio。总的来说,我真的很难理解 asyncio,我认为一个大问题是它最近似乎发展得非常迅速,因此围绕这些冲突有大量非常特定于版本的信息。不利于学习,不幸的是。 ~~~~这是我尝试使用该示例的方法,它连接,接收初始消息,然后连接丢失/关闭:

async def producer(message):
    print('Sending message')

async def consumer_handler(websocket, path):
    while True:
        message = await websocket.recv()
        await print(message)
        await pub.sendMessage('sender', message)

async def producer_handler(websocket, path):
    while True:
        message = await producer()
        await websocket.send(message)

async def wsconnect():
        async with websockets.connect(getWebsocketURL()) as websocket:
            path = "443"
            async def handler(websocket, path):
                consumer_task = asyncio.ensure_future(
                    consumer_handler(websocket, path))
                producer_task = asyncio.ensure_future(
                    producer_handler(websocket, path))
                done, pending = await asyncio.wait(
                    [consumer_task, producer_task],
                    return_when=asyncio.FIRST_COMPLETED,
                )
                for task in pending:
                    task.cancel()

pub.subscribe(producer, 'sender')

asyncio.get_event_loop().run_until_complete(wsconnect())

那么,我如何构建此代码以通过同一个 websocket 连接进行发送和接收?当 websocket 连接打开时,我还可以在同一个脚本中进行各种 API 调用,这使事情变得更加复杂。

我使用的是 Python 3.6.6,此脚本旨在作为模块导入到其他脚本中,因此 websocket 功能需要封装在函数或类中以供外部调用。

【问题讨论】:

  • 如果你能专注于一种方法,回答这个问题会更容易。例如,由于您在python-asyncio 下发布,第二个使用 asyncio websockets。

标签: python websocket python-3.6 python-asyncio


【解决方案1】:

我和你的情况完全一样。我知道这是一个非常不雅的解决方案 因为它仍然不是全双工的,但我似乎无法在互联网或 stackoverflow 上找到任何涉及 asyncio 和我使用的 websockets 模块的示例。

我不认为我完全理解您的 websockets 示例(是服务器端代码还是客户端代码?)但我将解释我的情况和“解决方案”,也许这对您也有用。

所以我有一个服务器主函数,它有一个 websocket 使用 recv() 在循环中监听消息。当我发送“开始”时,它将启动一个函数,该函数将每秒向浏览器中的 javascript 客户端发送数据。但是,当函数发送数据时,我有时想暂停或停止来自客户端的数据流发送停止消息。问题是,当我在数据发送已经开始时使用 recv() 时,服务器停止发送数据,只等待消息。我尝试了线程、多处理和其他一些东西,但最终我找到了一个暂时的解决方案,即在客户端收到一条数据后立即向服务器发送“pong”消息,以便服务器在下一次循环迭代时继续发送数据或例如,如果“pong”消息是“停止”,则停止发送数据,但这不是真正的双工,只是快速半双工......

我的python“服务器”上的代码

 async def start_server(self,websocket,webserver_path):
    self.websocket = websocket
    self.webserver_path = webserver_path
    while True:
        command = await self.websocket.recv()
        print("received command")
        if command == "start":
            await self.analyze()
        asyncio.sleep(1)

在我的分析函数中:

        for i,row in enumerate(data)
            await self.websocket.send(json.dumps(row))
            msg = await self.websocket.recv()
            if msg == "stop":
                self.stopFlag = True
                return
            await asyncio.sleep(1)

主要

start_server = websockets.serve(t.start_server, "127.0.0.1", 5678)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

javascript 客户端上的代码

var ws = new WebSocket("ws://127.0.0.1:5678/");
        ws.onmessage = function (event) {
            var datapoint = JSON.parse(event.data);
            console.log(counter);
            counter++;
            data.push(datapoint);
            if (data.length > 40){
                var element = data.shift();
                render(data);
            }
            ws.send("pong");//sending dummy message to let server continue
        };

我知道这不是解决方案,我希望其他人提供更好的解决方案,但由于我有相同或非常相似的问题并且没有其他答案,我决定发布并希望它有所帮助。

【讨论】:

    猜你喜欢
    • 2012-10-05
    • 1970-01-01
    • 1970-01-01
    • 2012-08-24
    • 2021-12-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多