【问题标题】:Why is Nginx truncating the gRPC streaming response?为什么 Nginx 会截断 gRPC 流式响应?
【发布时间】:2021-12-27 19:55:45
【问题描述】:

我之前问过这个问题,但我决定删除那个旧问题并将其与minimum reproducible example 一起重新表述。问题是,当我在 nginx 上部署我的 gunicorn 网络服务器时,我通过 gRPC 来自我的 go 服务器的流式响应被截断。所有详细信息都可以在存储库中找到。我对这个站点的 nginx 配置如下所示:

server {
    listen 80 default_server;
    server_name example.com;

    location / {
    #include proxy_params;
    proxy_pass http://localhost:5000;
    proxy_buffering off;
    chunked_transfer_encoding off;
    }
}

前端接收和解析响应的代码如下:

        <script>
            (async function(){
                const response = await fetch("{{ url_for('realtimedata') }}");
                const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
                while (true) {
                    const {done, value} = await reader.read();
                    if (done) break;
                    try {
                        console.log('Received', value);
                        const rtd = JSON.parse(value);
                        console.log('Parsed', rtd);
                    } catch(err) {
                        console.log(err);
                    }
                }
            })()
        </script>

关于来自 go 服务器的数据需要注意的一点是,一个服务提供具有 96 个字段的数据对象,而另一个服务提供具有 200 个字段的数据。这使得传入的流响应具有可变长度(以字节为单位)。

我想使用 gunicorn,因为我可能同时有多个听众。使用 gunicorn 解决了所有响应都发送到网络服务器但它们分布在活动客户端之间的问题。所以每个客户都会得到不同的响应,但不是全部。

编辑: 我尝试将 goserver 上的响应对象大小更改为与两个服务相同,但截断仍然发生。可变长度似乎不是问题。我也尝试过使用 uWSGI 而不是 gunicorn 来执行此操作,但问题仍然存在。我什至设置了uwsgi_buffering off;,问题仍然存在。

更新: 我用 Apache2 而不是 Nginx 运行了最小的可重现示例,我遇到了同样的问题。也许问题出在其他地方。

【问题讨论】:

    标签: nginx flask grpc gunicorn gevent


    【解决方案1】:

    查看您的 python 代码,似乎使用 websockets 将数据从后端推送到前端会更好。我已经重写了您的后端以使用 FastAPI 而不是 Flask 并修改了 nginx 配置。

    main.py

    import asyncio
    import dependencies.rpc_pb2 as r
    import dependencies.rpc_pb2_grpc as rpc
    from fastapi import FastAPI, WebSocket, Request
    from fastapi.templating import Jinja2Templates
    import grpc
    import json
    import os
    
    os.environ["GRPC_SSL_CIPHER_SUITES"] = 'HIGH+ECDSA'
    
    app = FastAPI()
    templates = Jinja2Templates(directory="templates")
    server_addr = "localhost"
    server_port = 3567
    
    @app.get("/")
    def read_root(request: Request):
        return templates.TemplateResponse("index.html", {"request": request})
    
    def parseRtd(rtd):
        rtdDict = {}
        rtdDict["source"] = rtd.source
        rtdDict["is_scanning"] = rtd.is_scanning
        rtdDict["timestamp"] = int(rtd.timestamp)
        rtdDict["data"] = {}
        for key, v in rtd.data.items():
            rtdDict["data"][int(key)] = {"name": v.name, "value": v.value}
        return rtdDict
    
    def get_rtd():
        channel = grpc.insecure_channel(f"{server_addr}:{server_port}")
        stub = rpc.RpcServiceStub(channel)
        for rtd in stub.SubscribeDataStream(r.SubscribeDataRequest()):
            yield parseRtd(rtd)
    
    @app.websocket("/ws")
    async def websocket_endpoint(websocket: WebSocket):
        await websocket.accept()
        await websocket.send_json({"test": "this is a test"})
        it = get_rtd()
        while True:
            await asyncio.sleep(0.1)
            payload = next(it)
            await websocket.send_json(payload)
    

    index.html

    <html>
        <head>
            <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.4.0/socket.io.js" integrity="sha512-nYuHvSAhY5lFZ4ixSViOwsEKFvlxHMU2NHts1ILuJgOS6ptUmAGt/0i5czIgMOahKZ6JN84YFDA+mCdky7dD8A==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>
        </head>
        <body>
            <script>
                var ws = new WebSocket("ws://localhost:5000/ws");     
                ws.onopen = function () {
                    console.log("websocket was open");
                };
                ws.onclose = () => {
                    console.log("Websocket was closed!");
                }
                ws.onerror = (error) =>{
                    console.error("Websocket error: " + JSON.stringify(error));
                };
                ws.onmessage = (message) => {
                    console.log("MSG: " + message.data );
                };
            </script>
        </body>
    </html>
    

    webserver.conf

    server {
        listen 80 default_server;
        server_name example.com;
    
        location / {
            include proxy_params;
            proxy_pass http://localhost:5000;
        }
    
        location /ws {
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header Host $host;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "Upgrade";
            proxy_pass http://localhost:5000;
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-08-29
      • 2015-04-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多