【问题标题】:Websockets bridge for audio stream in FastAPIFastAPI中音频流的Websockets桥接器
【发布时间】:2021-03-29 09:10:06
【问题描述】:

目标

我的目标是使用音频流。从逻辑上讲,这是我的目标:

  1. 音频流来自WebSocket A(FastAPI端点)
  2. 音频流桥接到不同的 WebSocket B,它将返回 JSON(Rev-ai 的 WebSocket)
  3. Json 结果通过 WebSocket A 实时发回。因此,当音频流仍在进来时。

可能的解决方案

为了解决这个问题,我有很多想法,但最终我一直在尝试将WebSocket A 连接到WebSocket B。到目前为止,我的尝试涉及一个ConnectionManager 类,其中包含一个Queue.queue。音频流的块被添加到这个队列中,这样我们就不会直接从WebSocket A消费。

ConnectionManager 还包含一个生成器方法,用于从队列中产生所有值。

我的 FastAPI 实现从 websocket A 消费,如下所示:

@app.websocket("/ws")
async def predict_feature(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            chunk = await websocket.receive_bytes()
            manager.add_to_buffer(chunk)
    except KeyboardInterrupt:
        manager.disconnect()

在此摄取的同时,我希望有一个任务将我们的音频流桥接到WebSocket B,并将获得的值发送到WebSocket A。音频流可以通过前面提到的generator方法来消费。

由于 WebSocket B 消费消息的方式,生成器方法是必要的,如 Rev-ai 的examples 所示:

streamclient = RevAiStreamingClient(access_token, config)
response_generator = streamclient.start(MEDIA_GENERATOR)
for response in response_generator:
    # return through websocket A this value
    print(response)

这是最大的挑战之一,因为我们需要将数据消耗到生成器中并实时获取结果。

最新尝试

我一直在与asyncio 碰碰运气;据我了解,一种可能性是创建一个在后台运行的协程。我在这方面没有成功,但听起来很有希望。

我曾考虑通过FastAPI 启动事件触发此事件,但我无法实现并发。我尝试使用event_loops,但它给了我一个nested event loop 相关的错误。

警告

FastAPI 可以是可选的,如果您的洞察力认为如此,在某种程度上,WebSocket A 也是如此。归根结底,最终目标是通过我们自己的 API 端点接收音频流,并通过 Rev.ai 运行它WebSocket,做一些额外的处理,然后将结果发回。

【问题讨论】:

    标签: python websocket python-asyncio fastapi starlette


    【解决方案1】:

    websocket 桥 websocket

    下面是一个简单的 webscoket 代理示例,其中 websocket A 和 websocket B 都是 FastAPI 应用程序中的端点,但 websocket B 可以位于其他位置,只需更改其地址 ws_b_uri。对于 websocket 客户端,使用websockets 库。

    为了进行数据转发,A端点的代码启动了两个任务forwardreverse,并通过asyncio.gather()等待它们完成。双向数据传输以并行方式进行。

    import asyncio
    
    from fastapi import FastAPI
    from fastapi import WebSocket
    import websockets
    app = FastAPI()
    
    ws_b_uri = "ws://localhost:8001/ws_b"
    
    
    async def forward(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
        while True:
            data = await ws_a.receive_bytes()
            print("websocket A received:", data)
            await ws_b.send(data)
    
    
    async def reverse(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
        while True:
            data = await ws_b.recv()
            await ws_a.send_text(data)
            print("websocket A sent:", data)
    
    
    @app.websocket("/ws_a")
    async def websocket_a(ws_a: WebSocket):
        await ws_a.accept()
        async with websockets.connect(ws_b_uri) as ws_b_client:
            fwd_task = asyncio.create_task(forward(ws_a, ws_b_client))
            rev_task = asyncio.create_task(reverse(ws_a, ws_b_client))
            await asyncio.gather(fwd_task, rev_task)
    
    
    @app.websocket("/ws_b")
    async def websocket_b(ws_b_server: WebSocket):
        await ws_b_server.accept()
        while True:
            data = await ws_b_server.receive_bytes()
            print("websocket B server recieved: ", data)
            await ws_b_server.send_text('{"response": "value from B server"}')
    

    更新(桥接 websocket 同步生成器)

    考虑到问题的最后更新,问题在于 WebSocket B 隐藏在同步生成器后面(实际上有两个,一个用于输入,另一个用于输出),事实上,任务变成了如何在 WebSocket 和同步生成器之间架起一座桥梁。由于我从未使用过rev-ai 库,因此我为streamclient.start 制作了一个存根函数stream_client_start,它接受一个生成器(原为MEDIA_GENERATOR)并返回一个生成器(原为response_generator)。

    在这种情况下,我通过run_in_executor 在单独的线程中开始处理生成器,为了不重新发明轮子,我使用来自janus 库的队列进行通信,它允许您绑定通过队列进行同步和异步代码。相应地,也有两个队列,一个为A -> B,另一个为B -> A

    
    import asyncio
    import time
    from typing import Generator
    from fastapi import FastAPI
    from fastapi import WebSocket
    import janus
    import queue
    
    app = FastAPI()
    
    
    # Stub generator function (using websocket B in internal)
    def stream_client_start(input_gen: Generator) -> Generator:
        for chunk in input_gen:
            time.sleep(1)
            yield f"Get {chunk}"
    
    
    # queue to generator auxiliary adapter
    def queue_to_generator(sync_queue: queue.Queue) -> Generator:
        while True:
            yield sync_queue.get()
    
    
    async def forward(ws_a: WebSocket, queue_b):
        while True:
            data = await ws_a.receive_bytes()
            print("websocket A received:", data)
            await queue_b.put(data)
    
    
    async def reverse(ws_a: WebSocket, queue_b):
        while True:
            data = await queue_b.get()
            await ws_a.send_text(data)
            print("websocket A sent:", data)
    
    
    def process_b_client(fwd_queue, rev_queue):
        response_generator = stream_client_start(queue_to_generator(fwd_queue))
        for r in response_generator:
            rev_queue.put(r)
    
    
    @app.websocket("/ws_a")
    async def websocket_a(ws_a: WebSocket):
        loop = asyncio.get_event_loop()
        fwd_queue = janus.Queue()
        rev_queue = janus.Queue()
        await ws_a.accept()
    
        process_client_task = loop.run_in_executor(None, process_b_client, fwd_queue.sync_q, rev_queue.sync_q)
        fwd_task = asyncio.create_task(forward(ws_a, fwd_queue.async_q))
        rev_task = asyncio.create_task(reverse(ws_a, rev_queue.async_q))
        await asyncio.gather(process_client_task, fwd_task, rev_task)
    

    【讨论】:

    • 你好@alex_noname!您的回答非常有见地,但是恐怕我应该进一步澄清一层,这将是 Rev-ai 提供的用于访问 WebSocket B 的接口,用于执行生成器。尽管如此,感谢您如何构建您的答案。我在问题中添加了一些额外的内容来澄清这一点。我想就生成器征求您的意见,如果没有开发,我会接受这个答案 - 因为它确实展示了如何桥接两个 WebSocket。
    • 我已经更新了生成器的答案,但是如果 rev-ai 库有问题,那么我可能无法提供帮助。
    • @alex_noname 非常感谢 websocket 桥的实现,正是我需要的!
    猜你喜欢
    • 2021-07-11
    • 2015-08-14
    • 1970-01-01
    • 1970-01-01
    • 2014-07-28
    • 1970-01-01
    • 1970-01-01
    • 2013-03-05
    • 2015-10-16
    相关资源
    最近更新 更多