【问题标题】:websocket asynchronous feedback during a long processwebsocket 长过程中的异步反馈
【发布时间】:2019-03-06 14:27:20
【问题描述】:

我正在尝试在网页中实现反馈,让用户从 Excel 工作表开始一个漫长的过程(看,是的......)。每行数据的处理时间约为1秒,常见的数据长度在40到100条之间,所以整体处理时间可以大于一分钟。

我正在页面中显示数据的预览,通过 websocket 启动进程,并希望显示来自同一个 websocket 的进展。

处理本身是由外部包完成的,页面复杂度最小,所以我把它包装在一个Lite单个文件中。

我的问题是,在 websocket 路由中开始的长时间处理会阻塞反馈,直到它完成并且最后同时发送所有进程事件。据我了解,它与Mojolicious的事件循环有关,我应该单独开始处理以避免冻结websocket的处理。

请注意,我已尝试使用 EventSource 的单独反馈渠道在处理过程中向客户端推送一些进度,但它在最后一次显示相同的完成。

这是我的代码简化,我使用sleep() 来模拟漫长的过程。我从

开始
perl mojo_notify_ws.pl daemon

您能否建议如何修改 websocket 路由以允许实时反馈?

use Mojolicious::Lite;
use Mojo::JSON qw(encode_json decode_json j);

use Data::Dumper;

$|++;

any '/' => sub {
    my $c = shift;
    $c->render('index');
};

my $peer;
websocket '/go' => sub {
    use Carp::Always;
    my $ws = shift;

    $peer = $ws->tx;
    app->log->debug(sprintf 'Client connected: %s', Dumper $peer->remote_address);

    # do not subscribe to 'text' else 'json' won't work
    #$ws->on(text => sub {
    #    my ($ws, $msg) = @_;
    #    app->log->debug("Received text from websocket: `$msg`");
    #        });

    # $peer->send('{"type": "test"}');
    # say 'default inactivity timeout='. (p $ws->inactivity_timeout());
    $ws->inactivity_timeout(120);

    $ws->on(json => sub {
        my ($ws, $msg) = @_;
        app->log->debug('Received from websocket:', Dumper(\$msg));
        unless($msg){
            app->log->debug('Received empty message? WTF?!');
            return;
        }
        my $prompt = $msg->{cmd};
        return unless $prompt;
        app->log->debug(sprintf 'Received: `%s`', $prompt // 'empty??');

        # simulate
        my $loop = Mojo::IOLoop->singleton;

#        $loop->subprocess( sub {
#            my $sp = shift;

        for my $cell (1..3) {
            # $loop->delay( sub {
                app->log->debug("sending cell $cell");
                my $payload = {
                        type => 'ticket',
                        cell => $cell,
                        result => $cell % 2 ? 'OK' : 'NOK'
                };
                $ws->send( { json => $payload } );
                sleep(2);
                # $loop->timer(2, sub {say 'we have waited 2 secs!';})->wait;
            # });
        };

#        }, sub {} );#subprocess

        app->log->debug('sending end of process ->websocket');
        $ws->send({json => { type => 'end' } });
    });

    $ws->on(finish => sub {
        my ($ws, $code, $reason) = @_;
        $reason = '' unless defined $reason;
        app->log->debug("Client disconnected: $code ($reason)");
    });

    app->log->debug('Reached end of ws route definition');
};

app->start;

__DATA__

@@ index.html.ep
<html>
    <head>
    <script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.js"></script>
    <script>
var timerID = 0; 
function keepAlive(ws) { 
    var timeout = 20000;  
    if (ws.readyState == ws.OPEN) {  
        ws.send('ping');  
    }  
    timerId = setTimeout(function(){keepAlive(ws);}, timeout);  
}  
function cancelKeepAlive() {  
    if (timerId) {  
        clearTimeout(timerId);  
    }  
}

function flagCell(cell, result){
    var id='#CELL_' + cell;
    var cell = $(id);
    if(cell) {
        if (result=='OK') {
            cell.css('color', 'green');
            cell.text('⯲');
        } else {
            cell.css('color','red');
            cell.text('✘');
        }
    }
}

function process(){
    //debugger;
    console.log('Opening WebSocket');
    var ws = new WebSocket('<%= url_for('go')->to_abs %>');

    ws.onopen = function (){
        console.log('Websocket Open');
        //keepAlive(ws);
        ws.send(JSON.stringify({cmd: "let's go Perl"}));
    };
    //incoming
    ws.onmessage = function(evt){
        var data = JSON.parse(evt.data);
        console.log('WS received '+JSON.stringify(data));
        if (data.type == 'ticket') {
            console.log('Server has send a status');
            console.log('Cell:'+data.cell + ' res:' + data.result);

            flagCell(data.cell, data.result);
        } else if (data.type == 'end') {
            console.log('Server has finished.');
            //cancelKeepAlive();
            ws.close();
        } else {
            console.log('Unknown message:' + evt.data);
        }
    };
    ws.onerror = function (evt) {
        console.log('ws error:', evt.data);
    }
    ws.onclose = function (evt) {
        if(evt.wasClean) {
            console.log('Connection closed cleanly');
        } else {
            console.log('Connection reseted');
        }
        console.log('Code:'+ evt.code + ' Reason:' + evt.reason);
    }
}

    </script>
    </head>
    <body>
        <button type=button id='upload' onclick="process();">Go</button><br>
        <div style='font-family:sans;'>
            <table border="1px">
              <tr><td id="CELL_1">&nbsp;</td><td>Foo</td></tr>
              <tr><td id="CELL_2">&nbsp;</td><td>Bar</td></tr>
              <tr><td id="CELL_3">&nbsp;</td><td>Baz</td></tr>
            </table>
        </div>
    </body>
</html>

编辑:

Grinnz 提供了一个合适的解决方案,但为了记录,这是我尝试使用Mojo::IOLoop::Subprocess 回调但我根本没有任何反馈。我在 Linux 上运行,Subprocess 似乎分叉,父进程似乎立即终止 websocket 编辑: 否:我最终发现 $ws-&gt;send()位于错误的位置,因为它应该放在在父端运行的第二个sub{} 中,而不是在子进程中运行的第一个中。这段代码应该重构为每次循环迭代有一个subprocess,最后一步是通知结束。

这里是修改后的on(json)

$ws->on(json => sub {
    my ($ws, $msg) = @_;
    app->log->debug('Received from websocket:', Dumper(\$msg));
    unless($msg){
        app->log->debug('Received empty message? WTF?!');
        return;
    }
    my $prompt = $msg->{cmd};
    return unless $prompt;
    app->log->debug(sprintf 'Received: `%s`', $prompt // '<empty??>');

    # my $loop = Mojo::IOLoop->singleton;
    my $subprocess = Mojo::IOLoop::Subprocess->new;
    app->log->debug("we are pid $$");
    $subprocess->run( 
        sub {
            my $sp = shift;
            for my $cell (1..3) {
                app->log->debug("starting process for cell $cell in pid $$");     
                sleep(2);
                app->log->debug("sending cell $cell to ws");
                my $payload = {
                    type => 'ticket',
                    cell => $cell,
                    result => $cell % 2 ? 'OK' : 'NOK'
                };
                $ws->send( { json => $payload } ); # FIXME: actually this line is in the wrong place
                                                   # and should be in the second sub{}
            };
        },
        sub {
            my ($sp, $err, @results) = @_; 
            $ws->reply->exception($err) and return if $err;
            app->log->debug('sending end of process ->websocket');
            $ws->send({json => { type => 'end' } });
        });  
    # Start event loop if necessary
    $subprocess->ioloop->start unless $subprocess->ioloop->is_running;       
});

以及对应的日志:

[Wed Oct  3 19:51:58 2018] [debug] Received: `let's go Perl`
[Wed Oct  3 19:51:58 2018] [debug] we are pid 8898
[Wed Oct  3 19:51:58 2018] [debug] Client disconnected: 1006 ()
[Wed Oct  3 19:51:58 2018] [debug] starting process for cell 1 in pid 8915
[Wed Oct  3 19:52:00 2018] [debug] sending cell 1 to ws
[Wed Oct  3 19:52:00 2018] [debug] starting process for cell 2 in pid 8915
[Wed Oct  3 19:52:02 2018] [debug] sending cell 2 to ws
[Wed Oct  3 19:52:02 2018] [debug] starting process for cell 3 in pid 8915
[Wed Oct  3 19:52:04 2018] [debug] sending cell 3 to ws
[Wed Oct  3 19:52:04 2018] [debug] sending end of process ->websocket
[Wed Oct  3 19:52:04 2018] [debug] Client disconnected: 1005 ()

我还尝试使用Mojo::IOLoop-&gt;delay 以类似于Promise 解决方案的方式生成复杂的步骤序列,但这个最后同时发送所有通知:

$ws->on(json => sub {
    my ($ws, $msg) = @_;
    app->log->debug('Received from websocket:', Dumper(\$msg));
    unless($msg){
        app->log->debug('Received empty message? WTF?!');
        return;
    }
    my $prompt = $msg->{cmd};
    return unless $prompt;
    app->log->debug(sprintf 'Received: `%s`', $prompt // '<empty??>');

    app->log->debug("we are pid $$");

    my @steps;
    for my $cell (1..3) {
        push @steps, 
            sub {
                app->log->debug("subprocess for cell pid $cell");
                # my $sp = shift;
                my $delay = shift;
                sleep(2);
                app->log->debug("end of sleep for cell $cell");
                $delay->pass($cell % 2 ? 'OK' : 'NOK');
            },
            sub {
                my $delay = shift;
                my $result = shift;

                app->log->debug("sending cell $cell from pid $$ - result was $result");
                my $payload = {
                    type => 'ticket',
                    cell => $cell,
                    result => $result
            };
            $ws->send( { json => $payload } );
            $delay->pass;    
        };
    }

    # add final step to notify end of processing
    push @steps, sub {
        my $delay = shift;
        app->log->debug('sending end of process ->websocket');
        $ws->send({json => { type => 'end' } });
        $delay->pass;
    };

    my $delay = Mojo::IOLoop::Delay->new;
    app->log->debug("Starting delay...");
    $delay->steps( @steps );
    app->log->debug("After the delay");

});

【问题讨论】:

  • 您在使用 Subprocess 时的“另一次尝试”是什么?由于子进程无法与websocket通信,您是否尝试过我提到的方法?
  • @Grinnz:我刚刚更新了我的问题,以展示我对Subprocessdelay() 的不同尝试。但是您的Promise 解决方案接近预期结果。至少它提供了一种异步反馈。在您的答案 cmets 中查看我的附加问题。
  • 延迟不再是真正的建议,因为它们已被 Mojo 中的基于 Promise 的工具所取代,并且在历史上一直是人们更难以掌握的概念。它仍然阻塞websocket通信的原因是延迟类似于promise,它们只在代码运行时组织,代码在运行时仍然会阻塞,这就是子进程的作用。

标签: perl websocket mojolicious mojolicious-lite


【解决方案1】:

我对您更新的示例进行了一些小改动,以使其按预期工作。您可以使用Subprocess 模块的progress 功能来确保通过websocket 从long 子进程异步发送正确的数据。

代码现在按我的预期工作,每次子流程进行迭代时,都会在客户端更新表状态。

源代码的相关部分如下所示:

$ws->on(
    json => sub {
        my ( $ws, $msg ) = @_;
        app->log->debug( 'Received from websocket:', Dumper( \$msg ) );
        unless ($msg) {
            app->log->debug('Received empty message? WTF?!');
            return;
        }
        my $prompt = $msg->{cmd};
        return unless $prompt;
        app->log->debug( sprintf 'Received: `%s`', $prompt // '<empty??>' );

        # my $loop = Mojo::IOLoop->singleton;
        my $subprocess = Mojo::IOLoop::Subprocess->new;
        app->log->debug("we are pid $$");
        $subprocess->run(
            sub {
                my $sp = shift;
                for my $cell ( 1 .. 3 ) {
                    app->log->debug(
                        "starting process for cell $cell in pid $$");
                    sleep(2);
                    app->log->debug("sending cell $cell to ws");
                    my $payload = {
                        type   => 'ticket',
                        cell   => $cell,
                        result => $cell % 2 ? 'OK' : 'NOK'
                    };
                    $sp->progress($payload);
                }
            },
            sub {
                my ( $sp, $err, @results ) = @_;

                #$ws->send( { json => $payload } );
                $ws->reply->exception($err) and return if $err;
                app->log->debug('sending end of process ->websocket');
                $ws->send( { json => { type => 'end' } } );
            }
        );

        # Start event loop if necessary
        $subprocess->on(
            progress => sub {
                my ( $subprocess, $payload ) = @_;
                $ws->send( { json => $payload } );
            }
        );
        $subprocess->ioloop->start unless $subprocess->ioloop->is_running;
    }
);

【讨论】:

  • 我最近才注意到您的回答。在我提出问题后,此功能仅在 2018 年 11 月添加。这现在非常方便。谢谢!
【解决方案2】:

It is not possible to magically make Perl code non-blocking.这就是为什么你的阻塞操作会阻止 websocket 响应和事件循环。

单个子进程对此不起作用,因为只有处理请求的原始工作进程才能响应 websocket,子进程只能返回一次。但是,您可以使用子流程来准备要发送的每个响应。但是,您对子流程的使用并不完全正确。

传递给子进程的第一个子例程在 fork 中执行,因此不会阻塞主进程。一旦子进程完成,第二个子程序在父程序中执行,并接收第一个子程序的返回值。这是您需要发送回复的地方。

除此之外的任何代码都将在子进程启动之前执行,因为这是异步代码,您需要通过回调对逻辑进行排序。您可以使用promises 让复杂的排序变得更简单。

use Mojo::Promise;

$ws->on(json => sub {
    my ($ws, $msg) = @_;
    app->log->debug('Received from websocket:', Dumper(\$msg));
    unless($msg){
        app->log->debug('Received empty message? WTF?!');
        return;
    }
    my $prompt = $msg->{cmd};
    return unless $prompt;
    app->log->debug(sprintf 'Received: `%s`', $prompt // 'empty??');

    my $promise = Mojo::Promise->new->resolve; # starting point
    # attach follow-up code for each cell, returning a new promise representing the whole chain so far
    for my $cell (1..3) {
        $promise = $promise->then(sub {
            my $promise = Mojo::Promise->new;
            Mojo::IOLoop->subprocess(sub {
                app->log->debug("sending cell $cell");
                sleep(2);
                my $payload = {
                        type => 'ticket',
                        cell => $cell,
                        result => $cell % 2 ? 'OK' : 'NOK'
                };
                return $payload;
            }, sub {
                my ($sp, $err, $payload) = @_;
                return $promise->reject($err) if $err; # indicates subprocess died
                $ws->send( { json => $payload }, sub { $promise->resolve } );
            });

            # here, the subprocess has not been started yet
            # it will be started when this handler returns to the event loop
            # then the second callback will run once the subprocess exits
            return $promise;
        };
    }

    # chain from last promise
    $promise->then(sub {
        app->log->debug('sending end of process ->websocket');
        $ws->send({json => { type => 'end' } });
    })->catch(sub {
        my $err = shift;
        # you can send or log something here to indicate an error occurred in one of the subprocesses
    });
});

如果合适的话,我可以更详细地介绍一些其他选项:Mojo::IOLoop::ReadWriteFork 这将让您只启动一个子进程并不断地从它接收 STDOUT(您需要自己序列化您的有效负载以在 STDOUT 上发送它,就像 Mojo::JSON);或通过两个进程都可以连接到的外部 pub/sub 代理将状态信息发送回父进程的常规子进程,例如 PostgresRedisMercury(也需要序列化)。

【讨论】:

  • 太棒了,这与我想要的非常接近!最后一件事是 Promise 的排序。我虽然起初所有的结果都是同时发送的,但实际上所有的承诺似乎都是立即开始的,因为使用sleep(2 * $cell) 可以产生预期的延迟反馈。实际处理包括在 Jira 服务器中创建任务,您是否想到了一种解决方案来按顺序处理而不是并行处理,以避免通过一次发送所有请求来执行 DOS?感觉用sleep($cell)有点难看。 :)
  • 您的意思是要等待每个子流程完成后再开始下一个?
  • 要等待每个promise在下一个promise之前解决,你想附加一个->then回调来创建下一个等待的promise,最后的->then/->catch就是附加到此链中的最后一个承诺。我已经用这个逻辑更新了示例代码。
  • 完美!这就是我一直在寻找的。您使我理解了我在研究期间发现但不理解的Mojo::Promise。我一直在寻找一个体面的解决方案 2 周...非常感谢! Mojo::IOLoop::ReadWriteFork 看起来也很有趣,我可能对不同的用例有一些用法,稍后会评估。
【解决方案3】:

您可以使用线程而不是子进程来完成工作。创建线程后,您需要一个通过 websocket 更新进度的循环。

如果您处理在所有情况下都必须完成的关键工作负载(websocket 消失、网络中断等),您应该将其委托给另一个守护进程,该守护进程通过文件或套接字保持并传达其状态。

如果它是非关键工作负载并且您可以轻松地重新启动它,这可能是适合您的模板。

# Insert this at module header
# use threads;
# use Thread::Queue;

my $queue  = Thread::Queue->new();
my $worker = threads->create(sub {
  # dummy workload. do your work here
  my $count = 60;
  for (1..$count) {
    sleep 1;
    $queue->enqueue($_/$count);
  }

  # undef to signal end of work
  $queue->enqueue(undef);

  return;
});

# blocking dequeuing ends when retrieving an undef'd value
while(defined(my $item = $queue->dequeue)) {
  # update progress via websocket
  printf("%f %\n", $item);
}

# join thread
$worker->join;

【讨论】:

  • 很有趣,但在我的情况下,它将通过队列返回的状态的阻塞循环来替换阻塞处理。
  • 我接受:在这种情况下阻塞是不好的。此处避免阻塞的另一种方法是创建第二个线程,该线程“观察”(出队)第一个线程并进行进度更新。两个线程必须分离而不是连接,以避免阻塞主进程。在这里使用线程的一个重要警告是,此时所有加载的包都将被克隆到一个新的解释器中。如果您的资源(内存)有限,这可能会影响您的系统。
猜你喜欢
  • 2022-06-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多