主要问题是,当您编写响应 http(s) 的 Web 应用程序时,您的代码有一个非常特殊的“生命周期”:通常您有一个“查看”函数来获取请求数据,执行收集响应数据并将其返回所需的所有操作。
大多数 Web 框架中的“视图”功能必须独立于系统的其余部分 - 它应该能够执行其职责,除了调用时获取的数据或对象之外的其他数据或对象 - 这是请求数据, 和系统配置 - 使应用程序服务器(旨在将您的程序实际连接到互联网的框架部分)可以选择多种方式来为您的程序提供服务:它们可以在多个并行线程中运行您的视图功能,或者在多个并行进程,甚至在各种容器或物理服务器中的不同进程中:您的应用程序不需要关心这些。
如果您想要一个在 调用您的视图函数时可用的资源,您需要打破这种范式。例如,通常,框架会希望创建一个数据库连接池,以便同一进程上的视图可以重用这些连接。这些数据库连接通常由框架本身提供,框架本身实现了一种允许重用的机制,并在需要时以透明的方式可用。如果您想保持 websocket 连接处于活动状态,您必须重新创建一个相同类型的机制。
从某种意义上说,您需要一个 Python 对象,该对象可以调解您的 websocket 数据,其行为类似于您的 web 视图函数的“服务器”。
这比听起来更简单——一个特殊的 Python 类被设计为每个进程有一个实例,它保持连接,并且能够发送和接收从并行调用接收的数据而不会破坏它就足够了。确保此实例存在于当前进程中的可调用对象足以在配置为将您的应用程序提供给网络的任何策略下工作。
如果您使用的是不使用 asyncio 的 Flask,您会遇到更复杂的情况 - 您将失去视图中的异步功能,他们将不得不等待 websocket 申请完成 - 然后它将是您的应用程序服务器的工作是在不同的线程或进程中查看您的视图以确保可用性。而且,你的工作是让你的 websocket 的 asyncio 循环在一个单独的线程中运行,以便它可以发出它需要的请求。
这是一些示例代码。
请注意,除了每个进程使用一个 websocket,
如果发生任何类型的故障,这没有任何规定,但是,
最重要的是:它什么都不做并行:全部
成对的发送接收被阻塞,因为你没有给出任何线索
一种允许将每条传出消息配对的机制
和它的回应。
import asyncio
import threading
from queue import Queue
class AWebSocket:
instance = None
def __new__(cls, *args, **kw):
if cls.instance:
return cls.instance
return super().__new__(cls, *args, **kw)
def __init__(self, *args, **kw):
cls = self.__class__
if cls.instance:
# init will be called even if new finds the existing instance,
# so we have to check again
return
self.outgoing = Queue()
self.responses = Queue()
self.socket_thread = threading.Thread(target=self.start_socket)
self.socket_thread.start()
def start_socket():
# starts an async loop in a separate thread, and keep
# the web socket running, in this separate thread
asyncio.get_event_loop().run_until_complete(self.core())
def core(self):
self.socket = websockets.connect(uri)
async def _send(self, websocket, payload):
await websocket.send(json.dumps(payload).encode("utf-8"))
async def _recv(self, websocket):
data = await websocket.recv()
return json.loads(data)
async def core(self):
uri = f"wss://the-third-party-server.com/xyz"
async with websockets.connect(uri) as websocket:
self.websocket = websocket
while True:
# This code is as you wrote it:
# it essentially blocks until a message is sent
# and the answer is received back.
# You have to have a mechanism in your websocket
# messages allowing you to identify the corresponding
# answer to each request. On doing so, this is trivially
# paralellizable simply by calling asyncio.create_task
# instead of awaiting on asyncio.gather
payload = self.outgoing.get()
future = self._send(websocket, payload)
future_r = self._recv(websocket)
_, response = await asyncio.gather(future, future_r)
self.responses.put(response)
def send(self, payload):
# This is the method you call from your views
# simply do:
# `output = AWebSocket().send(payload)`
self.outgoing.put(payload)
return self.responses.get()