需要下载的资料和参考文档


在实际项目中即时消息的展示必不可少,比如站内信,在线客服等,一般这类实现由两种方式,一种是采用ajax轮询的方式拉取,还有一种就是采用websocket主推推送的方式。其中主动推送可以节约服务器资源,有效减少无效的访问和查询,所以越来越被大家所推崇。接下来我们就讲讲如何采用websocket的方式完成一次消息推送。
在php中推送的框架很多,比较出名和好用的应该是swoole和workerman。其中swoole是php的c扩展,workerman纯php写的socket处理框架。有个区别就是swoole没有windows版本,而workerman有windows版本,这样我们就可以在win上开发,无缝迁移到linux,so我们就采用workerman作为框架。
首先下载开发用的win版本workerman增强版Gateway代码https://github.com/walkor/GatewayWorker-for-win 在YII的根目录创建一个目录叫pushServer,然后修改下载的源码根目录文件夹名称为dev表示我们的开发环境(还可以下载linux版本的放进来叫product表示生产环境)。修改applications下面的文件夹yourApp为blog。所以最终的文件夹目录结构如下:
yii基于workerman的pushserver实现消息推送
使用gateway推送的整个推送流程可以参考官方文档http://www.workerman.net/gatewaydoc/advanced/push.html
1.Yii通过gateway提供的内部协议(例如纯文本通信协议)向gateway发送消息
2.gateway接收到YII代码发送的消息,然后将消息通过websocket推送的监听的前端网页
所以整个看上去pushserver像是一个二传手,只是作为推送的服务器使用(其实推送还可以选用nodejs等天生就具备推送长项的框架来使用,模式基本一样套用即可)。
首先我们完成第一步,用YII的php代码向gateway的内部协议发送推送数据。我们在pushServer/applictions/YourApp中创建一个基于文本协议的gateway文件start_text_gateway.php代码如
下:

<?php
/**
* Created by PhpStorm.
* User: lxf
* Date: 2017/2/28
* Time: 10:03
*/
use \Workerman\Worker;
use \Workerman\WebServer;
use \GatewayWorker\Gateway;
use \GatewayWorker\BusinessWorker;
use \Workerman\Autoloader;

// 自动加载类
require_once __DIR__ . '/../../vendor/autoload.php';
// #### 内部推送端口(假设当前服务器内网ip为本机) ####
$internal_gateway = new Gateway("Text://127.0.0.1:8806");
$internal_gateway->name='internalGateway';
$internal_gateway->startPort = 2800;
// 端口为start_register.php中监听的端口,websocket推送默认是1238
$internal_gateway->registerAddress = '127.0.0.1:1238';
// #### 内部推送端口设置完毕 ####


if(!defined('GLOBAL_START'))
{
Worker::runAll();
}


接下来我们在YII中创建一个测试的console类型的PushCommand(protect/console/PushCommand.php)

<?php
class PushCommand extends CConsoleCommand
{
public function actionIndex()
{
$task_id = 'task 123456';
$open_id = 'open id 8888';
// 建立连接,@see http://php.net/manual/zh/function.stream-socket-client.php
$client = stream_socket_client('tcp://127.0.0.1:8806');
if(!$client) return "can not connect";
// 模拟超级用户,以文本协议发送数据,注意Text文本协议末尾有换行符(发送的数据中最好有能识别超级用户的字段)
//这样在Event.php中的onMessage方法中便能收到这个数据,然后做相应的处理即可
fwrite($client, '{"type":"send","task_id":"'.$task_id.'","openid":"'.$open_id.'"}'."\n");
}

}


接下来我们要完成在Events类中定义修改核心方法onMessage,这里会处理所有发来的消息,包括内部调用的消息和推送的消息

/**
* 当客户端发来消息时触发
* @param int $client_id 连接id
* @param mixed $message 具体消息
*/
public static function onMessage($client_id, $message)
{

// debug
echo "client:{$_SERVER['REMOTE_ADDR']}:{$_SERVER['REMOTE_PORT']} gateway:{$_SERVER['GATEWAY_ADDR']}:{$_SERVER['GATEWAY_PORT']} client_id:$client_id session:"
.json_encode($_SESSION)." onMessage:".$message."\n";

var_export(Gateway::getAllClientSessions());echo "\n";

// 客户端传递的是json数据
$message_data = json_decode(rtrim($message), true);
if(!$message_data)
{
echo "message_data is null return\n";
return ;
}
// 根据类型执行不同的业务
switch($message_data['type'])
{
// 客户端回应服务端的心跳
case 'pong':
return;
// 客户端初始化 message格式: {type:init, task_id:xx}
case 'init':
$task_id = $message_data['task_id'];
Gateway::bindUid($client_id,$task_id);
Gateway::sendToUid($task_id,'{"type":"login_success"}');
return;
// Yii代码推送 message格式: {type:send, task_id:xx,openid:xxxx,data:xxxxxx}
case 'send':
$task_id = $message_data['task_id'];
Gateway::sendToUid($task_id,'{"type":"login_status","openid":"'.$message_data['openid'].'","data":"xxx"}');
return;
}
// 向所有人发送
// Gateway::sendToAll("$client_id said $message\r\n");
}

修改pushServer/start_for_win.bat
php Applications\YourApp\start_register.php Applications\YourApp\start_gateway.php Applications\YourApp\start_businessworker.php Applications\YourApp\start_text_gateway.php pause
这个时候执行下start_for_win.bat就可以在console中看到server执行的结果了

yii基于workerman的pushserver实现消息推送

我们再启动一个console,执行yii中console的push方法调用测试看看
yii基于workerman的pushserver实现消息推送

php protected/yiic.php push index


回到gateway的console上我们可以看到log显示
yii基于workerman的pushserver实现消息推送


看我们发送的消息已经被gateway所接收到了,接下来,我们继续完成模拟客户端websocket交互的代码
修改applications/blog/start_gateway.php修改gateway协议为websocket
$gateway = new Gateway("websocket://0.0.0.0:7272");

打开一个chrome,f12打开工具栏选择console
1.现在chorme命令台运行这段
var ws = new WebSocket("ws://localhost:7272");
ws.onopen = function(){
console.log("握手成功");
};
2.连接成功后就可以
ws.send('{"type":"init","task_id":"task 123456"}')ws.onmessage = function(e){ console.log(e.data);};

然后去yii的console中执行推送代码
php protected/yiic.php push index

在浏览器中就会显示log {"type":"login_status","openid":"open id 8888","data":"xxx"}
可见我没推送的数据已经被前端的浏览器获取到了,整个推送流程完成了。

相关文章: