websocket 桥 websocket
下面是一个简单的 webscoket 代理示例,其中 websocket A 和 websocket B 都是 FastAPI 应用程序中的端点,但 websocket B 可以位于其他位置,只需更改其地址 ws_b_uri。对于 websocket 客户端,使用websockets 库。
为了进行数据转发,A端点的代码启动了两个任务forward和reverse,并通过asyncio.gather()等待它们完成。双向数据传输以并行方式进行。
import asyncio
from fastapi import FastAPI
from fastapi import WebSocket
import websockets
app = FastAPI()
ws_b_uri = "ws://localhost:8001/ws_b"
async def forward(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
while True:
data = await ws_a.receive_bytes()
print("websocket A received:", data)
await ws_b.send(data)
async def reverse(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
while True:
data = await ws_b.recv()
await ws_a.send_text(data)
print("websocket A sent:", data)
@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
await ws_a.accept()
async with websockets.connect(ws_b_uri) as ws_b_client:
fwd_task = asyncio.create_task(forward(ws_a, ws_b_client))
rev_task = asyncio.create_task(reverse(ws_a, ws_b_client))
await asyncio.gather(fwd_task, rev_task)
@app.websocket("/ws_b")
async def websocket_b(ws_b_server: WebSocket):
await ws_b_server.accept()
while True:
data = await ws_b_server.receive_bytes()
print("websocket B server recieved: ", data)
await ws_b_server.send_text('{"response": "value from B server"}')
更新(桥接 websocket 同步生成器)
考虑到问题的最后更新,问题在于 WebSocket B 隐藏在同步生成器后面(实际上有两个,一个用于输入,另一个用于输出),事实上,任务变成了如何在 WebSocket 和同步生成器之间架起一座桥梁。由于我从未使用过rev-ai 库,因此我为streamclient.start 制作了一个存根函数stream_client_start,它接受一个生成器(原为MEDIA_GENERATOR)并返回一个生成器(原为response_generator)。
在这种情况下,我通过run_in_executor 在单独的线程中开始处理生成器,为了不重新发明轮子,我使用来自janus 库的队列进行通信,它允许您绑定通过队列进行同步和异步代码。相应地,也有两个队列,一个为A -> B,另一个为B -> A。
import asyncio
import time
from typing import Generator
from fastapi import FastAPI
from fastapi import WebSocket
import janus
import queue
app = FastAPI()
# Stub generator function (using websocket B in internal)
def stream_client_start(input_gen: Generator) -> Generator:
for chunk in input_gen:
time.sleep(1)
yield f"Get {chunk}"
# queue to generator auxiliary adapter
def queue_to_generator(sync_queue: queue.Queue) -> Generator:
while True:
yield sync_queue.get()
async def forward(ws_a: WebSocket, queue_b):
while True:
data = await ws_a.receive_bytes()
print("websocket A received:", data)
await queue_b.put(data)
async def reverse(ws_a: WebSocket, queue_b):
while True:
data = await queue_b.get()
await ws_a.send_text(data)
print("websocket A sent:", data)
def process_b_client(fwd_queue, rev_queue):
response_generator = stream_client_start(queue_to_generator(fwd_queue))
for r in response_generator:
rev_queue.put(r)
@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
loop = asyncio.get_event_loop()
fwd_queue = janus.Queue()
rev_queue = janus.Queue()
await ws_a.accept()
process_client_task = loop.run_in_executor(None, process_b_client, fwd_queue.sync_q, rev_queue.sync_q)
fwd_task = asyncio.create_task(forward(ws_a, fwd_queue.async_q))
rev_task = asyncio.create_task(reverse(ws_a, rev_queue.async_q))
await asyncio.gather(process_client_task, fwd_task, rev_task)