swoolechat.php
<?php
use Swoole\Coroutine as co;
class swoolechat {
//协程内部的变量
public $sql_string=null;
public $sql_res=null;
public $map=null;
//公共变量
public $ws;
public $configws=array(
\'worker_num\' => 1,//进程数 进程数越多,负载能力越大,建议cpu核数2倍
\'max_request\' => 10000,//进程处理完这么多任务数,就会重启,防止溢出
\'max_conn\'=>10000,//最大保持连接数,也就是同时在线人数
\'ipc_mode\' => 1,//1使用unix socket通信2使用消息队列通信3使用消息队列通信,并设置为争抢模式
\'dispatch_mode\' => 2,//2固定模式,同一个链接只被一个worker处理,1轮询模式3争抢模式
\'debug_mode\'=> 1 ,
\'daemonize\' => 0,//守护模式
\'log_file\' => \'\log\swoole.log\',
\'heartbeat_check_interval\' => 30,//心跳侦测时间间隔
\'heartbeat_idle_time\' => 600,//闲置时间上线
);
public $configsql=array(
\'host\'=>\'127.0.0.1\',
\'port\'=>3306,
\'user\'=>\'ws_ltcpi_com\',
\'password\'=>\'La2bKpT4T2xrrkLL\',
\'database\'=>\'ws_ltcpi_com\',
\'charset\'=>\'utf8\',
\'timeout\'=>10,
);
//构造函数
public function __construct() {
//创建websocket服务器对象,监听0.0.0.0:端口
$this->ws = new swoole_websocket_server("0.0.0.0", 9501);
$this->ws->set($this->configws);
//监听WebSocket连接
$this->ws->on(\'open\', function ($ws, $request) {
echo "server: handshake success with fd{$request->fd}\n";
$this->ws->push($request->fd, "hello, welcome\n");
});
//监听WebSocket收到信息
$this->ws->on(\'message\', function ($ws, $frame) {
// echo "onmessage: handshake success with fd{$frame}\n";
self::onmessage($ws,$frame->fd,$frame->data);
});
//关闭事件
$this->ws->on(\'close\', function ($ws, $fd) {
echo "client-{$fd} is closed\n";
});
//启动服务
echo "启动3服务\n";
$this->ws->start();
}
//内部方法
protected function get_sql_insert($table,$arr){
$sql=" insert into ".$table." (";
foreach (array_keys($arr) as $k=>$v){
$sql.=$v.\',\';
}
$sql=rtrim($sql, ",");
$sql.=\') values (\';
foreach ($arr as $k=>$v){
$sql.="\'".$v."\',";
}
$sql=rtrim($sql, ",");
$sql.=\')\';
return $sql;
}
protected function get_sql_delete_fd($fd){
$res="DELETE FROM fa_fdband WHERE fd = \'{$fd}\'";
return $res;
}
protected function get_sql_delete_userid($userid){
$res="DELETE FROM fa_fdband WHERE userid = \'{$userid}\'";
return $res;
}
protected function pushtofd($fd,$type,$status,$msg){
$res=json_encode(array(\'type\'=>$type,\'status\'=>$status,\'msg\'=>$msg));
$this->ws->push($fd,utf8_encode($res));
}
protected function get_sql_bandlist($fd){
$sql="select * from fa_fdband where fd=\'{$fd}\'order by id desc";
return $sql;
}
//收到命令
public function onmessage($ws,$fd,$data){
//获取fd 数组数据
$data_arr=json_decode($data, true);
if ($data==\'heart\' or array_key_exists(\'type\',$data_arr)==false){
//心跳回复
$ws->push($fd,\'heart\');
}else{
$type=$data_arr[\'type\'];
//登录房间
if ($type==\'band\'){
$roomid=$data_arr[\'roomid\'];
$userid=$data_arr[\'userid\'];
self::band($fd,$roomid,$userid);
}
//聊天
if ($type==\'chatsend\'){
$msg=$data_arr[\'msg\'];
self::chatsend($fd,$msg);
}
}
}
//绑定房间
public function band($fd,$roomid,$userid){
//生成删除语句 插入语句
$this->map=array(
\'fd\'=>$fd,
\'userid\'=>$userid,
\'roomid\'=>$roomid,
\'createtime\'=>time(),
);
go(function() {
$db=new co\MySQL();
$db->connect($this->configsql);
//获取变量
$fd=$this->map[\'fd\'];
$roomid=$this->map[\'roomid\'];
$userid=$this->map[\'userid\'];
//删除同fd房间
$res=$db->query(self::get_sql_delete_fd($fd));
var_dump(self::get_sql_delete_fd($fd));
var_dump($res);
if ($res!=1){
self::pushtofd($fd,\'band\',0,\'登录房间失败,删除同fd失败\');
}
//删除同userid绑定信息
$res=$db->query(self::get_sql_delete_userid($userid));
if ($res!=1){
self::pushtofd($fd,\'band\',0,\'登录房间失败,删除同userid失败\');
}
//创建房间
$res=$db->query(self::get_sql_insert(\'fa_fdband\',$this->map));
if ($res==1){
self::pushtofd($this->map[\'fd\'],\'band\',1,\'登录房间成功\');
}else{
self::pushtofd($this->map[\'fd\'],\'band\',0,\'登录房间失败,因为插入失败\');
}
});
}
public function chatsend($fd,$msg){
$this->map=array(
\'fd\'=>$fd,
\'msg\'=>$msg,
);
go(function() {
$db=new co\MySQL();
$db->connect($this->configsql);
//获取变量
$fd=$this->map[\'fd\'];
$msg=$this->map[\'msg\'];
//根据fd查询绑定信息
$band_list=$db->query(self::get_sql_bandlist($fd));
if (count($band_list)<1){
self::pushtofd($this->map[\'fd\'],\'chatsend\',0,\'请先绑定房间\');
return;
}
//保存记录
$map=array(
\'userid\'=>$band_list[0][\'userid\'],
\'roomid\'=>$band_list[0][\'roomid\'],
\'msg\'=>$msg,
\'createtime\'=>time(),
);
//生成保存消息语3句
$res=$db->query(self::get_sql_insert(\'fa_fdchat\',$map));
var_dump(self::get_sql_insert(\'fa_fdchat\',$map));
var_dump($res);
self::pushtofd($this->map[\'fd\'],\'chatsend\',1,\'success\');
//转发消息
$band_list=$db->query("select * from fa_fdband where roomid=\'{$map[\'roomid\']}\'order by id desc");
foreach ($band_list as $k=>$v){
if ($v[\'fd\']==$fd){
$msg=array(
\'nickname\'=>\'我\',
\'msg\'=>$map[\'msg\']
);
self::pushtofd($v[\'fd\'],\'chatrev\',1,$msg);
}else{
$msg=array(
\'nickname\'=>\'追风少年\',
\'msg\'=>$map[\'msg\']
);
self::pushtofd($v[\'fd\'],\'chatrev\',1,$msg);
}
}
});
}
}
$obj=new SwooleChat();
client.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>webSocket客户端接收命令</title>
</head>
<body>
<p id="status"></p>
<input type="text" id="login" style="width: 500px" value=\'{"type":"band","roomid":"3","userid":"4"}\'/>
<button onclick="login()">绑定房间</button>
<br/>
<input type="text" id="chat" style="width: 500px" value=\'{"type":"chatsend","msg":"大家好我是新人4"}\'/>
<button onclick="chat()">发送消息</button>
<!--<button onclick="clientClose()">关闭连接</button>-->
<!--<button onclick="clientBreak()">彻底断开</button>-->
<script type="text/javascript">
var lockReconnect = false;//避免重复连接
var wsUrl = "ws://124.156.153.202:9501"; // websocket链接
var ws;
function decodeUnicode(str) {
str = str.replace(/\\/g, "%");
return unescape(str);
}
createWebSocket();
// 创建websocket
function createWebSocket(){
try {
ws = new WebSocket(wsUrl);
ws.onopen = function (evt) {
onOpen(evt);
};
ws.onclose = function (evt) {
onClose(evt);
websocketReconnect(wsUrl);
};
// 接收服务端数据时触发事件
ws.onmessage = function (evt) {
onMessage(evt);
};
// 通信发生错误时触发
ws.onerror = function (evt) {
websocketReconnect(wsUrl);
onError(evt);
};
} catch (e) {
console.log(\'catch\');
websocketReconnect(wsUrl);
}
}
//被动业务
function onOpen(evt) {
document.getElementById(\'status\').innerText += "receive:" + "建立链接成功..." + "\n";
//心跳检测重置
heartCheck.start();
}
function onClose(evt) {
document.getElementById(\'status\').innerText += "receive:" + "连接已关闭..." + "\n";
}
function onMessage(evt) {
if (evt.data!==\'heart\'){
document.getElementById(\'status\').innerText += "receive:" + decodeUnicode(evt.data)+ "\n";
}
heartCheck.start();
}
function onError(evt) {
document.getElementById(\'status\').innerText += "通信错误"+ "\n";
}
//重新连接
function websocketReconnect(url) {
if (lockReconnect) { // 是否已经执行重连
return;
}else {
lockReconnect = true;
//没连接上会一直重连,设置延迟避免请求过多
tt && clearTimeout(tt);
var tt = setTimeout(function () {
createWebSocket(url);
lockReconnect = false;
}, 5000);
}
}
//发送心跳
var heartCheck = {
timeout: 5000,
timeoutObj: null,
serverTimeoutObj: null,
start: function () {
var self = this;
this.timeoutObj && clearTimeout(this.timeoutObj);
this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
this.timeoutObj = setTimeout(function () {
ws.send("heart");
console.log(\'send:start\');
self.serverTimeoutObj = setTimeout(function () {
ws.close();
}, self.timeout);
}, this.timeout)
}
};
//主动业务
function login() {
ws.send(document.getElementById(\'login\').value);
}
function chat() {
ws.send(document.getElementById(\'chat\').value);
}
function clientClose() {
ws.close();
document.getElementById(\'status\').innerText += "客户端主动普通断开" +"\n";
}
</script>
</body>
</html>
sql
https://files.cnblogs.com/files/bufeetu/websocket.7z