【问题标题】:Time sleep in reponse Stream with ReactPHP使用 ReactPHP 响应流中的时间睡眠
【发布时间】:2021-06-21 18:25:46
【问题描述】:

我正在使用 ReactPHP 和响应流。

我已经成功创建了一个 POC 来生成这样的响应流:

function (int $chunks, int $sleep) use ($loop) {
    $stream = new ThroughStream();
    $loop->addPeriodicTimer($sleep, function (TimerInterface $timer) use ($stream, $chunks, $loop) {
        static $i = 0;
        $stream->write(microtime(true) . PHP_EOL);
        $i++;
        if ($i >= $chunks) {
            $loop->cancelTimer($timer);
            $stream->end();
        }
    });
    return new Response(200, ['Content-Type' => 'text/plain'], $stream);
}

调用curl -X GET -i https://localhost:9091/sleepstream/10/1会产生

1624222419.1271
1624222420.1282
1624222421.1293
1624222422.1302
1624222423.1312
1624222424.1323
1624222425.1333
1624222426.1342
1624222427.1353
1624222428.1363

每行在前一行之后 1 秒打印。不错。

现在我正在尝试创建一个更逼真的控制器:

function (int $sleep, ServerRequestInterface $request) use ($loop) {
    $body = $request->getBody();
    assert($body instanceof \React\Stream\ReadableStreamInterface);
                    
    $in = new \Clue\React\NDJson\Decoder($body);
                    
    $stream = new ThroughStream();

    $in->on('data', function ($data) use ($stream, $loop) {
        $data->ts = time();
        $loop->futureTick(function () use ($stream, $data) {
            echo "DATA\n";
            $stream->write(\json_encode($data) . PHP_EOL);
            sleep(1);
        });
    });
                    
    $in->on('end', function() use ($stream, $loop) {
        $loop->addTimer(2, function () use ($stream) {
            $stream->end();
        });
    });
    return new Response(200, ['Content-Type' => 'text/plain'], $stream);
}

我正在使用 NDJSON 输入文件 users.ndjson

{"id":1,"name":"Alice"}
{"id":2,"name":"Bob"}
{"id":3,"name":"Carol"}
{"id":4,"name":"David"}
{"id":5,"name":"Zach"}

所以这么称呼:

curl -D /dev/stderr -s -X PUT -T contrib/users.ndjson https://localhost:9091/bridge/ndjson/1

给出:

{"id":1,"name":"Alice","ts":1624231062}
{"id":2,"name":"Bob","ts":1624231062}
{"id":3,"name":"Carol","ts":1624231062}
{"id":4,"name":"David","ts":1624231062}
{"id":5,"name":"Zach","ts":1624231062}

但是响应不是每秒逐行收到,而是一次完成...

正在显示服务器日志:

DATA
DATA
DATA
DATA
DATA

有好的时机(每秒 1 个)。

我不明白为什么第二个控制器中的响应流中断了。

编辑: 我开始理解为什么响应流在第二个控制器中被破坏了。

@WyriHaximus 指出sleep() 阻塞了循环。所以我现在的问题是:我如何才能让记录准备好流式传输并模拟睡眠的循环(我试图模拟输入流和输出流之间的计算滞后以验证某个关键点)。

【问题讨论】:

    标签: streaming reactphp


    【解决方案1】:

    ReactPHP 核心维护者在这里。您在代码中的sleep(1); 将事件循环阻塞整整一秒钟。并且由于事件循环无法将数据写出,因为它在尝试时被阻塞。此外,如果添加time() 调用以进行调试,您可能希望使用microtime(true) 更好地可视化写入队列之间的时间。除非我遗漏了某些内容并且您有充分的理由,否则您通常不希望延迟写出处理后获得的数据,因为它会占用您不需要保留的内存。

    【讨论】:

    • 谢谢,感谢您的回答和工作的极大平静。我正在尝试模拟计算滞后,以确保我可以在服务器仍在流上执行操作时开始在客户端处理结果(批量 PUT 的 POC)。我认为输入流没有被分块编码,所以我的输入流被缓冲,但这是另一个问题。我能做些什么来替换 sleep() ?我尝试了一些暂停/恢复逻辑,但没有按预期进行。
    • 您可以使用计时器而不是未来的滴答声来模拟睡眠,但效果相同。或者,您可以使用带有周期性计时器的队列来延迟消息。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-03-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-25
    • 1970-01-01
    相关资源
    最近更新 更多