测试工具 http://www.blue-zero.com/WebSocket/
2018年8月6日17:28:24
<?php namespace App\Console\Commands; use Illuminate\Console\Command; use Workerman\Worker; use App\Work\ChatroomWork; class Chatroom extends Command { protected $taskserver; /* * 操作参数 * 注意只能在 * start 启动 * stop 停止 * relaod 只能重启逻辑代码,核心workerman_init无法重启,注意看官方文档 * status 查看状态 * connections 查看连接状态(需要Workerman版本>=3.5.0) * * 库 composer require workerman/workerman */ protected $action = array(\'start\', \'stop\', \'reload\', \'status\', \'connections\'); /** * The name and signature of the console command. * * @var string */ protected $signature = \'Chatroom {action}\'; /** * The console command description. * * @var string */ protected $description = \'Chatroom\'; /** * Create a new command instance. * * @return void */ public function __construct() { parent::__construct(); } /** * Execute the console command. * * @return mixed * * 注意上线提供的方法 * */ public function handle() { $action = $this->argument(\'action\'); if (!in_array($action, $this->action)) { $this->error(\'Error Action\'); exit; } //初始化workerman ChatroomWork::workerman_init($action); } }
<?php namespace App\Work; use App\Work\BaseWork as Base; use Illuminate\Support\Facades\DB; use App\Work\CommonWork; use Workerman\Worker; use Workerman\Lib\Timer; use App\Models\OperationLog; use App\Models\Users; class ChatroomWork extends Base { //全局的总连接数 static $connection_count = 0; //单个房间最大的连接数 static $room_max_numbers; //房间所有的用户连接数ID集合,判断发送给那些用户,绑定用户和connection_id static $room_connection_array = []; public static function workerman_init($action = null) { global $argv; $argv[0] = \'workerman:websocket\'; $argv[1] = $action; // $argv[2] = \'-d\'; // 心跳 define(\'HEARTBEAT_TIME\', 30); //初始化 $worker = new Worker("websocket://172.17.1.247:9090"); $worker->name = \'Chatroom\'; //linux 用户线上是www // $worker->user = \'www\'; //守护模式信息输出文件地址 // $worker->stdoutFile = "./workerman.log"; //工作进程总数 测试环境4个 $worker->count = 4; //正式环境 // $ws->count = 10; //建立链接 处理逻辑 $worker->onConnect = function($connection) { // 有新的客户端连接时,连接数+1 self::$connection_count++; self::onConnect($connection); }; //接受消息 处理逻辑 $worker->onMessage = function($connection, $data) { self::onMessage($connection, $data); }; //关闭链接 处理逻辑 $worker->onClose = function($connection) { // 客户端关闭时,连接数-1 self::$connection_count--; self::onClose($connection); }; // 进程启动后设置一个30秒运行一次的定时器 $worker->onWorkerStart = function($worker) { // Timer::add(1, function()use($worker) { // $time_now = time(); // foreach ($worker->connections as $connection) { // // } // }); }; // 开始 Worker::runAll(); } //建立链接 处理逻辑 public static function onConnect($connection) { //主动心跳ping测试60秒一次 // Timer::add(HEARTBEAT_TIME, function() use($connection) { // $connection->send(json_encode([\'code\' => 200, \'msg\' => \'服务存活\', \'data\' => [], \'connections\' => self::$connection_count])); // }); } //接受消息 处理逻辑 public static function onMessage($connection, $data) { //解析数据,非合法的json数据不处理 if (!empty($data)) { if (is_json($data)) { $data = json_decode($data, true); if ($data[\'action_type\'] == \'ping\') { // 客户端回应服务端的心跳 $connection->send(json_encode([\'code\' => 200, \'msg\' => \'服务存活\', \'data\' => [], \'connections\' => self::$connection_count])); } elseif ($data[\'action_type\'] == \'login\') { //用户登录 if ($data[\'is_login\'] == 1) { //匿名登录 self::$room_connection_array[$data[\'room_id\']][$connection->id][\'user_name\'] = \'匿名用户\' . $connection->id; $connection->send(json_encode([\'code\' => 200, \'msg\' => \'匿名登录成功\', \'data\' => self::$room_connection_array, \'connections\' => self::$connection_count])); } elseif ($data[\'is_login\'] == 2) { //已登录 $Users = Users::where(\'id\', $data[\'user_id\'])->first(); if (empty($Users)) { $connection->send(json_encode([\'code\' => 201, \'msg\' => \'用户ID无效或者错误\', \'data\' => [], \'connections\' => self::$connection_count])); } else { $Users = $Users->toArray(); self::$room_connection_array[$data[\'room_id\']][$connection->id][\'user_name\'] = empty($Users[\'realname\']) ? $data[\'user_id\'] : $Users[\'realname\']; $connection->send(json_encode([\'code\' => 200, \'msg\' => \'登录成功\', \'data\' => self::$room_connection_array, \'connections\' => self::$connection_count])); } } else { $connection->send(json_encode([\'code\' => 201, \'msg\' => \'登录类型数据错误\', \'data\' => [], \'connections\' => self::$connection_count])); } } elseif ($data[\'action_type\'] == \'broadcast_to_all\') { //只发给房间的所有的人,除去自己 foreach ($connection->worker->connections as $con) { foreach (self::$room_connection_array[$data[\'room_id\']] as $k => $v) { // p($con->id); // p($k); if ($k == $con->id && $con->id != $connection->id) { $con->send($data[\'message\']); } } } } elseif ($data[\'action_type\'] == \'broadcast_to_one\') { } else { $connection->send(json_encode([\'code\' => 201, \'msg\' => \'action_type类型错误\', \'data\' => [], \'connections\' => self::$connection_count])); } } } else { $connection->send(json_encode([\'code\' => 201, \'msg\' => \'数据请求为空\', \'data\' => [], \'connections\' => self::$connection_count])); } } //关闭链接 处理逻辑 public static function onClose($connection) { //检索$connection->id 是否有在self::$room_connection_array中,有的话就剔除 foreach (self::$room_connection_array as $k => $v) { foreach ($v as $kk => $vv) { if ($kk == $connection->id) { unset(self::$room_connection_array[$k][$connection->id]); } } } // p(self::$room_connection_array[\'100\']); } }
{"action_type":"login","is_login":1,"room_id":101} //登录
{"action_type":"broadcast_to_all","room_id":100,"message":"111"} //发送消息
为什么不用switch代码会看起来更清晰,因为有bug,对字符串匹配的不好
注意:
1,$connection就是当前的连接数据
2,因为未根据work id做房间划分,不知道在超出单个work连接时候会不会出问题
3,可以根据实际压力去划分一个work的最大连接数,这里是简单的测试demo所以未做具体的细节划分
4,这里应该结合session来做处理数据,但是我只是根据发送数据来区别用户,你可以在登录的发送数据的时候根据session处理数据,看需要,必须要返回用户列表,就直接把room id下面的所有用户名返回就OK
5,workerman用起来其实还是比较简单的,但是我这种结合laravel的整合是有问题,比如现在我有一个消息推送,一个聊天室就没办法放在一起使用,必须用别的办法,如果是单功能就比较容易,直接结合,我有一个骚办法就是直接
复制artisan入口文件,直接增加新入口artisan1,经过测试完全没有问题,但是其实不是很好的解决方案,如果要用就先这么上吧
如果你觉得麻烦可以是gateway做比较简单
2019年7月12日09:43:56
注意:上面是临时测试代码。业务代码使用try catch处理异常和错误