【发布时间】:2018-10-04 17:53:44
【问题描述】:
抱歉,这篇文章很长,但我已经研究了一个多星期,所以我尝试了很多不同的东西。我对 Python 非常了解,但我对 Python 中的异步或非阻塞函数没有任何经验。
我正在为需要 websocket 连接的 web 服务编写一个 API 库/模块/包/任何东西。有许多传入的消息需要处理,以及一些与控制相关的(Web 应用程序级别,而不是 websocket 控制消息),我有时需要发送这些消息。我可以轻松地通过连接接收消息并对其采取行动。我可以发送消息,但只能响应收到的消息,因为接收循环总是阻塞等待消息。我不想等待传入消息来处理传出消息,因此脚本不必挂在输入上,直到收到新消息。在我努力让双向通信按预期工作时,我发现我需要使用 Twisted、Tornado 或 asyncio 之类的东西,但到目前为止,我尝试过的每个实现都失败了。请注意,发送必须通过同一连接进行。在接收循环之外打开一个短暂的连接将不起作用。这是我到目前为止所做的:
websocket 代码的第一次迭代是使用websocket-client 包。它非常接近文档中的示例:
import websocket
try:
import thread
except ImportError:
import _thread as thread
import time
def on_message(ws, message):
# Send message frames to respective functions
# for sorting, objectification, and processing
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
def run(*args):
# Send initial frames required for server to send the desired frames
thread.start_new_thread(run, ())
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp(buildWebsocketURL()),
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
ws.run_forever()
这会阻止循环外的任何进一步执行。我尝试学习 _thread 模块,但找不到任何迹象表明我可以从外部与 websocket 线程“通信”。我尝试设置一个 pub/sub 侦听器函数,将数据从另一个发送器函数转发到 ws.send(),但它不起作用。没有错误或任何东西,只是没有任何已发送消息的指示。
接下来我尝试了Websockets 模块。这似乎是从头开始构建以利用 asyncio。同样,我得到了一个客户端构建,它会发送初始消息并根据收到的消息采取行动,但进度停在那里:
async def wsconnection():
async with websockets.connect(getWebsocketURL()) as websocket:
while True:
message = await websocket.recv()
if message == '{"type":"broadcaster.ready"}':
subscriptions = getSubscriptions() # Get subscriptions from ident data
logging.info('Sending bookmarks to server as subscription keys')
subscriptionupdate = '{{"type": "subscribe","subscription_keys": ["{0}"],"subscription_scope": "update"}}'.format(
'","'.join(subscriptions))
subscriptioncontent = '{{"subscription_keys": ["{0}"],"subscription_scope": "content","type": "subscribe"}}'.format(
'","'.join(subscriptions))
logging.debug(subscriptioncontent)
await websocket.send(subscriptionupdate)
await websocket.send(subscriptioncontent)
await websocket.send(
'{"type":"message_lobby.read","lobby_id":"1","message_id:"16256829"}')
sortframe(message)
asyncio.get_event_loop().run_until_complete(wsconnection())
我尝试了前面提到的 pub/sub 监听器,但无济于事。在更彻底地阅读了这个模块的文档后,我尝试在循环之外获取 websocket 协议对象(包含 send() 和 recv() 方法),然后创建两个协程(?),一个监听传入消息,一个监听并发送传出消息。到目前为止,如果不在 wsconnection() 函数的范围内运行 async with websockets.connect(getWebsocketURL()) as websocket: 行,我完全无法获取 websocket 协议对象。我尝试使用websocket = websockets.client.connect(),根据docs,我认为会设置我需要的协议对象,但事实并非如此。在没有广泛的 asyncio 知识的情况下,我能找到的所有示例似乎都没有揭示以我需要的方式构建 websockets 发送器和接收器的任何明显方法。
我还使用 asyncio 和 Twisted 使用了与上述类似的代码结构的 autobahn,但我遇到了与上述相同的所有问题。
到目前为止,我得到的最接近的是上面的 Websockets 包。文档中有一个用于发送/接收连接的example snippet,但我无法真正阅读那里发生的事情,因为它都非常特定于 asyncio。总的来说,我真的很难理解 asyncio,我认为一个大问题是它最近似乎发展得非常迅速,因此围绕这些冲突有大量非常特定于版本的信息。不利于学习,不幸的是。 ~~~~这是我尝试使用该示例的方法,它连接,接收初始消息,然后连接丢失/关闭:
async def producer(message):
print('Sending message')
async def consumer_handler(websocket, path):
while True:
message = await websocket.recv()
await print(message)
await pub.sendMessage('sender', message)
async def producer_handler(websocket, path):
while True:
message = await producer()
await websocket.send(message)
async def wsconnect():
async with websockets.connect(getWebsocketURL()) as websocket:
path = "443"
async def handler(websocket, path):
consumer_task = asyncio.ensure_future(
consumer_handler(websocket, path))
producer_task = asyncio.ensure_future(
producer_handler(websocket, path))
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
pub.subscribe(producer, 'sender')
asyncio.get_event_loop().run_until_complete(wsconnect())
那么,我如何构建此代码以通过同一个 websocket 连接进行发送和接收?当 websocket 连接打开时,我还可以在同一个脚本中进行各种 API 调用,这使事情变得更加复杂。
我使用的是 Python 3.6.6,此脚本旨在作为模块导入到其他脚本中,因此 websocket 功能需要封装在函数或类中以供外部调用。
【问题讨论】:
-
如果你能专注于一种方法,回答这个问题会更容易。例如,由于您在
python-asyncio下发布,第二个使用 asyncio websockets。
标签: python websocket python-3.6 python-asyncio