催化剂和长轮询(彗星)应用
概述
在 2013 年年中,我决定将反向 AJAX(更进一步的 Comet)功能支持合并到我的 Catalyst 应用程序中,但我在 Web 上发现的关于它的信息非常少。于是我开始一块块地收集资料,这迫使我更深入地了解Catalyst框架。然后出现了一些来自 John Napiorkowski(Catalyst 的现任维护者)的非阻塞代码的好例子,这些例子对这个话题做了很多澄清。所以我编写了一个简单的服务器,在 Twiggy 上运行并为客户端提供长期的websockets 连接。这个解决方案并不假装是最好的,甚至是好的。这只是一个可行的解决方案。代码未重构,按原样提供。它可以用作构建更强大和更可靠的彗星应用程序的基础。它还可以通过多种方式进行改进。因此,如果您发现一些错误或改进建议,请告诉我。
简介
在本节中,我想概述一下这段代码的背景。
我有一个实现社交网络功能的 Catalyst 应用程序。它主要使用 AJAX 请求从服务器获取数据。它每分钟都会发出一个 AJAX 请求,以获取登录用户的数据更新。如果会话已过期,用户将被注销并重定向到登录页面。对于已登录的用户,我的应用程序中有一部分用户需要定期更新数据。在此页面上使用 comet 并不重要,我可以轻松使用 AJAX(一个更简单的选项,但与网络延迟和带宽以及发送不必要的请求有关),但我决定尝试一下。
如果您在预分叉服务器上运行 Catalyst 应用程序,您将拥有许多为您的客户提供服务的服务器。如果您想在 Catalyst 应用程序中建立长期连接,这意味着您将阻止您的应用程序的一个实例,同时保持此连接打开。如果您将只有少数客户端和大量硬件资源,您可以预分叉您的应用程序。但是,如果您想拥有数百个,更不用说数千个并发连接,此解决方案可能不适合您,因为您很快就会耗尽资源。这意味着您的 Catalyst 应用程序必须在非阻塞服务器上运行,或者您的客户端(浏览器)应该与另一台服务器通信,该服务器不会消耗太多硬件资源,并且可以为每个客户端分配一个自身的实例,同时保持连接打开。或者客户端可以连接到在事件循环中运行并以异步方式为用户响应新数据的服务器。
我能找到的 Catalyst 应用程序的唯一非阻塞服务器是 Twiggy。此服务器基于 AnyEvent。 AnyEvent 是 Perl 中事件驱动编程模型的框架,它使 Twiggy 服务器可以以非阻塞异步方式为客户端提供服务。例如,对于需要一些时间为用户获取数据的请求很有用。服务器不会阻止并等待数据可供用户使用,而是继续侦听新的传入请求,一旦某个用户的数据准备好,它将被发送给适当的用户。
可能不是在 Twiggy 上运行整个 Catalyst 应用程序的最佳想法。一个人可能想要使用一些健壮的、经过良好测试的服务器(nginx 或 Apache 或其他)并在这些前端服务器后面运行您的 Catalyst 应用程序作为 FastCGI 进程,例如(我选择的选项)。所以我决定运行 Twiggy 实例并将 Comet 流量定向到它(我尝试在 nginx 代理后面使用 Twiggy 进行 websockets 连接,但它没有以某种方式工作,所以我没有进一步调查就放弃了它)。第一个问题是身份验证。因为它是在 Catalyst 应用程序中完成的,而不是在前端服务器上,这意味着我的 Comet 应用程序必须以某种方式知道用户是否经过身份验证。
认证
Catalyst 的认证模块是Catalyst::Plugin::Authentication。它负责在您的应用程序中对用户进行身份验证和授权。你肯定会和它一起使用会话模块Catalyst::Plugin::Session。这允许跨 HTTP 请求保留应用程序数据(包括用户经过身份验证)。这个会话模块分为两部分:状态和存储。第一个允许选择如何跨不同的 HTTP 请求(很可能使用 cookie)保存应用程序的状态。第二个选项允许您选择WHERE 来存储用户会话的数据。
所以我使用Session::State::Cookie 表示状态,Session::Store::FastMmap 表示存储。这意味着当用户进行身份验证时,他会获得一个会话 ID,一个秘密字符串,他将在每个 HTTP 请求中将其发送到一个 HTTP 标头到服务器。此会话 ID 将在一段时间内有效,只要它有效,它就会唯一分配给某个用户。然后在每个传入的请求中,用户的数据将通过Session::Store::FastMmap 从 mmap'ed 文件中恢复。该文件充当共享内存进程间缓存。如果您的整个应用程序在单个服务器上运行,则此解决方案 (FastMap) 很好,但如果您进行负载平衡,例如,您可能希望使用其他解决方案(如 Catalyst::Plugin::Session::Store::DBI)。
所以我决定破解在这个会话数据上。在我的 Comet 应用程序中,我可以访问此会话数据并检查用户是否已通过身份验证。这是在以下子中完成的。
sub _check_session {
my ($sid, $this_user_id) = @_;
my $return = 0;
my $user_session = $session->get("session:$sid");
if ( $user_session ) {
## Check user realm existence
return $return unless ( $user_session->{__user_realm} );
## Check user presence
return $return unless ( $user_session->{__user} );
## Check session expiration time
my $session_expires_time = $session->get("expires:$sid");
my $now = time();
if ( $now > $session_expires_time ) {
return $return;
}
## Check if it is still the same user
if ( $this_user_id && ($this_user_id ne $user_session->{__user}->{id} ) ) {
return $return;
}
else {
$return = $user_session->{__user};
}
}
return $return;
}
浏览Catalyst::Plugin::Session和Catalyst::Plugin::Authentication我得出的结论是至少需要检查会话数据中的以下键:
-
__user_realm:如果用户在至少一个领域进行了身份验证,则此密钥存在于会话哈希中
-
__user:如果用户被认证,这个key代表用户数据(来自认证模块的::Store部分,很可能来自DBIx)
-
"expires:$sid" 表示会话到期时的时间戳
$session 是一个允许访问我们的 mmap 文件的对象:
my $session = Cache::FastMmap->new( raw_values => 0, share_file => ('/tmp/myapp/session_data') );
我们对可以在会话文件中查找的两条数据感兴趣:"session:$sid" 是会话数据的键,"expires:$sid" 是会话到期的时间戳。
所以现在,当浏览器尝试与我们的 Comet 应用建立 websocket 连接,我们必须调用这个 sub。如果用户通过身份验证,将与服务器建立 websocket 连接。虽然当用户在浏览器中注销或导航离开 Comet 应用程序时,我的应用程序会自动关闭 websocket 连接,但我还是决定每$interval 秒检查一次会话 ID。因此,如果恶意用户自己打开 websocket 连接,他将无法使用它。对于用户 A 注销并且用户 B 使用与用户 A 相同的会话 ID 登录的情况,并且所有这些都发生在下一次会话检查之前,会话将仍然处于活动状态,但将与另一个用户相关。在这种情况下,我们必须检查会话是否对应于最初建立 websocket 连接的用户:
if ( $this_user_id && ($this_user_id ne $user_session->{__user}->{id} ) ) {
return $return;
}
else {
$return = $user_session->{__user};
}
PSGI、Plack::Builder、Plack::Request
将我的 Comet 应用程序实现为 PSGI 应用程序是很自然的选择。我假设您熟悉此规范。
假设您希望您的应用程序将不同的 URL 映射到不同的应用程序,例如,当您的网站中有多个区域,每个区域都需要它自己的彗星逻辑时。你可以使用Plack::Builder:
use Plack::Builder;
...
## 1st app entrance point
my $psgi_app = sub {
my $env = shift;
...
}
...
builder {
## mount 1st app
mount "/comet/first_app" => $psgi_app;
}
现在您可以挂载任意数量的应用程序,每个应用程序对应一个不同的路径 (URL)。
如您所知,PSGI 应用程序的第一个参数是 $env,它是一个环境变量,一个包含不同路径的哈希与 HTTP 请求有关的密钥和与 PSGI 规范有关的密钥。使用它我们可以创建一个 Plack 请求对象,它允许我们访问不同的请求数据和 cookie。其中一个 cookie 将包含一个会话 ID,它是身份验证检查的起点。
## Request object
my $req = Plack::Request->new($env);
## session id
my $sid = $req->cookies->{myapp_session};
## HTTP origin header
my $req_base = $env->{HTTP_ORIGIN};
延迟和流式响应
如您所知,PSGI 应用程序应该返回一个元组(HTTP 状态、HTTP 标头和 HTTP 正文数据)。但是要启用服务器推送,应用程序应该返回一个回调作为其响应。然后,此回调将由底层服务器执行。现在,您可以在应用程序中利用事件循环将数据流式传输到客户端。
为了能够实现 websocket 服务器,必须使用 PSGI 扩展 psgix.io,它可以访问原始互联网套接字,这样就可以对流数据的完全访问。因为在 websocket 规范中,必须在初始握手连接期间从 HTTP 升级到 ws 协议,因此需要对套接字进行低级访问。
my $psgi_app = sub {
my $env = shift;
my $fh = $env->{'psgix.io'} or return [500, [], []];
## Create websocket handshake
my $hs = Protocol::WebSocket::Handshake::Server->new_from_psgi($env);
$hs->parse($fh) or return [400, [], [$hs->error]];
return sub {
my $responder = shift;
...
}
}
所以我们创建了一个对象,它负责处理 websocket 协议的消息数据格式,这些数据在客户端和服务器之间交换。该对象使用我们的原始互联网套接字进行初始化,以便它可以完成 HTTP 升级。然后我们返回一个回调,这将是我们的延迟响应。
comet 应用程序,服务器端
这是整个彗星psgi应用程序:
use Plack::Builder;
use Plack::Request;
use AnyEvent;
use Protocol::WebSocket::Handshake::Server;
use Cache::FastMmap;
use JSON;
use Template;
use Log::Dispatch;
use Data::Dumper;
use DateTime;
use FindBin qw($Bin);
use lib "$Bin/../lib";
use myapp::Schema;
use warnings;
use strict;
## Session data
my $interval = 3;
my $session = Cache::FastMmap->new( raw_values => 0, share_file => ('/tmp/myapp/session_data') );
## Database connection, for example with a Postgres DB
my $db_schema = myapp::Schema->connect( {
dsn => 'dbi:Pg:dbname=myapp_test',
user => 'my_login',
password => 'my_passwd',
pg_enable_utf8 => 1
} );
## Logging object
my $log = Log::Dispatch->new( outputs => [ [ 'File', min_level => 'debug', filename => '/var/log/myapp_test/comet' ] ] );
## Adjust this sub correspondingly if Session::Store has been changed.
sub _check_session {
my ($sid, $this_user_id) = @_;
my $return = 0;
my $user_session = $session->get("session:$sid");
## Check if the sid and the user email match
if ( $user_session ) {
## Check user realm existence
return $return unless ( $user_session->{__user_realm} );
## Check user presence
return $return unless ( $user_session->{__user} );
## Check session expiration time
my $session_expires_time = $session->get("expires:$sid");
my $now = time();
if ( $now > $session_expires_time ) {
return $return;
}
## Check if it is still the same user
if ( $this_user_id && ($this_user_id ne $user_session->{__user}->{id} ) ) {
return $return;
}
else {
$return = $user_session->{__user};
}
}
return $return;
}
## 1st app entrance point
my $psgi_app = sub {
my $env = shift;
my $fh = $env->{'psgix.io'} or return [500, [], []];
## Create websocket handshake
my $hs = Protocol::WebSocket::Handshake::Server->new_from_psgi($env);
$hs->parse($fh) or return [400, [], [$hs->error]];
return sub {
my $responder = shift;
## App data
my ($w, $hd, $input_params, $req, $sid, $user_id, $ret, $time_lapsed, $req_base);
## Clean up the websocket local environment
my $clean_up = sub {
$log->debug("\nCleaning up...\n");
## Destroy websocket
$hd->destroy;
## Remove timer from event loop
undef $w;
};
$hd = AnyEvent::Handle->new(
fh => $fh,
on_error => sub {
my ($hd, $fatal, $msg) = @_;
$clean_up->();
}
);
## Send server websocket handshake
$hd->push_write($hs->to_string);
## Websockets connection is initialized and is ready for data to be sent
#$hd->push_write( $hs->build_frame( buffer => encode_json( { 'status' => "Connection init..." } ) )->to_bytes );
## Get request data
$req = Plack::Request->new($env);
$sid = $req->cookies->{myapp_session};
$req_base = $env->{HTTP_ORIGIN};
## Check if user is authenticated
unless ( $ret = _check_session($sid, undef) ) {
$clean_up->();
}
else {
$user_id = $ret->{id};
}
$time_lapsed = 0;
## Template toolkit
my $template = Template->new({
INCLUDE_PATH => "$Bin/../root/templates",
VARIABLES => {
req_base => $req_base,
user_id => $user_id,
user_lang => $ret->{language}
},
ENCODING => 'utf8',
});
## Input parameters and recieve user's data.
$hd->on_read(sub {
(my $frame = $hs->build_frame)->append($_[0]->rbuf);
while (my $message = $frame->next) {
my $decoded_data = eval { decode_json $message };
## If it's not a valid json - exit
if ($@) {
$clean_up->();
}
else {
## New connection
if ( $decoded_data->{is_new} ) {
$input_params = $decoded_data;
$stash = {
template_data => "some data"
};
my $tt_output;
$template->process( "template_path", $stash, \$tt_output );
$hd->push_write( $hs->build_frame( buffer => encode_json( { 'init_data' => $tt_output } ), max_payload_size => 200000 )->to_bytes );
}
## Else - additional data are sent from the client
else {
}
}
}
});
## THIS APP'S MAIN LOGIC
## As an example, let's track if user has changed his/her name and return a message to the browser
my $app_logic = sub {
my $this_params = shift;
if ( $user_id ) {
my $rs = $db_schema->resultset('User')->search( { id => $user_id } )->single;
if ( $rs->first_name ne $ret->{first_name}) {
$hd->push_write( $hs->build_frame( buffer => encode_json( { 'data' => "User changed his name!" } ) )->to_bytes );
}
}
};
## Any event logic
$w = AnyEvent->timer (
interval => $interval,
after => $interval,
cb => sub {
## Check every half a minute if the user is still authenticated
if ( $time_lapsed > 30 ) {
$time_lapsed = 0;
unless ( $ret = _check_session($sid, $user_id) ) {
$clean_up->();
}
else {
## Check if user' object has been changed (e.g. his language etc.)
}
}
## Execute main logic
$app_logic->($input_params);
$time_lapsed += $interval;
}
);
};
};
builder {
## mount 1st app
mount "/comet/myapp" => $psgi_app;
}
因此,我们通过初始化一些常见对象(如数据库句柄和会话对象)来启动程序。
当 websocket 连接终止时,我们不想响应与其相关的事件,因此我们将它们从事件循环中删除.这就是子参考$clean_up 中所做的。然后我们定义一个AnyEvent::Handle 对象并监听它的on_read() 回调,每次新数据从客户端到达时都会触发它。
因为我希望能够使用相同的模板来为我的 Catalyst 生成 HTML应用程序和我的彗星应用程序,我创建了一个模板对象并使用在 Catalyst 对应项中必须相同的变量对其进行初始化。
第一次调用 on_read() 回调是在客户端打开 websocket 连接时。在 javascript 部分,我们为此定义了一个特殊键,并在新请求时向客户端发送初始数据(在我的情况下,它是稍后我获得彗星更新的数据)。
此外,我们创建一个 AnyEvent 计时器对象,它将定期执行我们的主逻辑应用程序$app_logic。它还将检查用户是否仍然经过身份验证并被授予从服务器获取数据更新的权限。
不要忘记,如果您通过 Catalyst 控制器更改数据库中的某些用户数据,并且此更改必须反映在会话哈希,你必须通过调用来持久化它
$c->persist_user();
comet 应用程序,客户端
我使用 javascript 模块的模块模式为每个 javascript 模块创建一个单独的命名空间。这是一个处理与彗星服务器通信的方法。
var myapp = (function() {
// Context data, private
var data_loaded = false;
var this_page = true;
return {
init: function() {
data_loaded = false;
this_page = true;
// No websockets in safari, somehow they don't work there
if ( navigator.userAgent.indexOf('Safari') != -1 && navigator.userAgent.indexOf('Chrome') == -1 ) {
myapp.myapp_load_ajax();
}
else {
// Create a websocket
websockets["myapp_socket"] = new WebSocket('ws://my-domain-name:5000/comet/myapp');
var input_hash = {};
input_hash["is_new"] = 1;
websockets["myapp_socket"].addEventListener("open", function(e) {
websockets["myapp_socket"].send(JSON.stringify(input_hash));
data_loaded = true;
});
websockets["myapp_socket"].addEventListener("message", function(e) {
var this_obj = JSON.parse(e.data);
// Connection is initialized
if ( this_obj.init_data ) {
// Make necessary initializations
myapp.init_after_loading();
}
// Websockets data update from server
else if ( this_obj.data ) {
// Do something meaningful on data update
}
});
websockets["myapp_socket"].addEventListener("close", function(e) {
//Connection has been closed
});
// In case when a websocket cannot be created, fall back to an AJAX request
websockets["myapp_socket"].addEventListener("error", function(e) {
// Unless the data have already been loaded, load it here for the first time, because
// this method will be also invoked when connection is dropped.
if ( !data_loaded && this_page ) {
myapp.myapp_load_ajax();
}
});
}
},
myapp_load_ajax: function() {
jQuery.ajax({
type: "POST",
url: "my_catalyst_app_load_ajax_data_path",
dataType: "json",
cache: false,
complete: function (xhr, textStatus) {
if ( xhr.responseText ) {
var this_data = jQuery.parseJSON(xhr.responseText);
if ( !this_data ) {
// An error happened
}
else {
// Make necessary initializations after your data has been inserted into the DOM
myapp.init_after_loading();
}
}
}
});
},
init_after_loading: function() {
// If you insert some data into DOM, initialize it here
},
close_socket: function() {
if ( websockets["myapp_socket"] ) {
websockets["myapp_socket"].close();
delete websockets["myapp_socket"];
}
},
};
})();
您可以在 Internet 上的其他地方详细了解模块模式。简而言之,它从您的 javascript 模块中创建一个对象,并且您的方法将作为该对象的属性进行访问。这允许您为您的模块创建一个单独的命名空间并定义私有变量。
为了能够从另一个模块访问我的 websocket(这可能是必要的),我声明了一个将其作为全局对象保存的对象。
我们要做的是为我们的 websocket 定义事件处理程序,包括“打开”、“关闭”等。如果由于某种原因我们无法建立到我们的彗星服务器的 websocket 连接(服务器关闭,它不接受新连接等),我们回退到 AJAX。此外,如果 safari 尝试创建与死机服务器的 websocket 连接,它不会在“错误”事件中处理这种情况,因此我们只是在 safari 中禁止 websockets。
因此,我们首先创建一个新的 websocket我们在彗星服务器的 plack 构建器中安装的 URL 的连接。然后我们使用 websocket 的事件“open”来处理新的连接,表示服务器关于新的客户端连接。事件“消息”用于向彗星服务器发送消息;每当关闭与服务器的连接时,都会调用“close”;在连接出现问题的情况下调用“错误”,例如无法建立连接或已断开连接或服务器已关闭连接或死亡。就是这样。现在我们将从彗星服务器获取数据更新。
启动彗星服务器
现在剩下的就是启动我们的服务器了。当前服务器的代码假定它将与您的 Catalyst 应用程序在同一台机器上运行。其他一些可能性将在最后的注释部分讨论。
我们使用命令行实用程序 plackup 来启动我们的服务器:
TWIGGY_DEBUG=1 plackup -s Twiggy comet.psgi
我使用TWIGGY_DEBUG env var 查看来自 Twiggy 服务器的调试信息。
结语
首先要记住的是,一旦执行 die 语句,Twiggy 服务器就会退出。这意味着您必须对其进行安全编程,并使用 eval 块拦截所有可能导致此问题的语句。
当前彗星服务器的前提是它与您的 Catalyst 应用程序在同一台机器上运行。如果您打算进行负载平衡并在另一台机器上运行它,您必须处理一些事情。首先,您必须将会话插件更改为Session::Store::DBI 或适合在多台机器上分发的内容(然后调整_check_session() 以不从文件中获取数据,而是从数据库中获取数据)。然后更改数据库连接的dsn 以包含主机名和端口号。
另外需要注意的是,如果用户名已更改,我们在服务器中的主要逻辑每N 秒检查一次。因此,如果您的服务器有很多客户端,那么经常查询数据库是低效的。相反,有一些更好的解决方案。第一种选择是,如果您在单台机器上运行 Catalyt 应用程序和 Comet 服务器,您可以使用 Cache::FastMmap 文件作为 Catalyst 应用程序和 Comet 服务器之间的中介,以获取有关服务器有一些新数据可用的通知然后才查询数据库以获取数据更新。在这种情况下,您进行数据库查询只是为了获取新数据。这意味着在您的 Catalyst 控制器中,您必须在每次通过控制器对您在彗星服务器中更新的数据进行更改时写入缓存文件以通知彗星服务器检查数据。例如,您有一个用户控制器和一个用户模型。每当用户更改他的名字时,都会调用用户控制器,然后调用用户模型来更改用户的数据。因此,在控制器用户中,您另外写入缓存文件就是这种情况。然后,彗星服务器将知道何时从数据库中获取数据更新。如果您进行负载平衡并且您的 Catalyst 和 Comet 应用程序在不同的机器上运行,您可以使用类似的方法。但在这种情况下,您必须使用您的数据库作为中介。例如,您可以创建一个新表,comet 服务器将定期查询该表。表的每一列都可以对应于你的彗星服务器的某个应用程序域。该列可以是timestamp 类型,并标记您跟踪的数据的最后更改时间。在您的 Catalyst 控制器中,每当相关数据发生更改时,您都会写入相应的列,然后您在彗星服务器中检查此列,然后您就知道是否查询数据库以获取数据更新。因此,您将避免许多不必要的数据查询。但是更好的选择,无论多么复杂,都会涉及到套接字。在这种情况下,当用户登录时,我们为他创建一个新套接字,并将我们想要跟踪的所有数据更新直接写入套接字。在彗星应用程序中,我们没有使用 Anyevent 计时器,而是定义了另一个 AnyEvent::Handle,我们使用用户的套接字进行了初始化。通过使用on_read() 方法,我们可以在更新时获得更新,然后立即将其返回给用户。在这种情况下,我们绕过数据查询,它应该工作得非常快。但是这个解决方案需要在 Catalyst 控制器中进行大量额外的工作。
另外需要注意的是,当前的 comet 服务器不支持安全 websockets (wss) 协议,而 Twiggy 不支持TLS/SSL。解决方案是在您的服务器前面使用 SSL 隧道,它将透明地加密/解密消息(查看 https://github.com/vti/app-tlsme)。
最后一点:我尝试运行前端代理 nginx在我的彗星服务器前。但不知何故,nginx 无法将消息传播到 Twiggy。所以客户端的浏览器直接与彗星服务器通信。如果您计划拥有成千上万的用户,那么 websocket 负载平衡是一个需要考虑的话题。
如果您发现任何错误或有任何改进想法,请发表评论或给我写电子邮件 (dhyana1981@yahoo.de)。