【问题标题】:Using PHP Pthreads with Ratchet Websocket将 PHP Pthreads 与 Ratchet Websocket 一起使用
【发布时间】:2014-10-31 11:48:08
【问题描述】:

我正在制作一个html5游戏www.titansoftime.com

我使用棘轮作为 php websocket 服务器解决方案。效果很好! http://socketo.me/docs/push

我已经使用 php pthreads 扩展进行了几次独立测试,并看到了一些非常令人兴奋的结果。它确实有效并且运作良好..只要 websockets 不在混合中。

Pthreads 为 php 提供了多线程功能(它确实有效,而且非常棒)。 http://php.net/manual/en/book.pthreads.php

这就是我的工作:

/src/server.php 这是启动守护进程的文件。

    <?php
    session_start();

    use Ratchet\Server\IoServer;
    use Ratchet\WebSocket\WsServer;
    use MyApp\Pusher;

    require __DIR__ . '/../vendor/autoload.php';

    require_once __DIR__ . '/../mysql.cls.php';
    require_once __DIR__ . '/../game.cls.php';
    require_once __DIR__ . '/../model.cls.php';

    $mysql = new mysql;
    $game  = new game;

    $loop   = React\EventLoop\Factory::create();
    $pusher = new MyApp\Pusher();

    $loop->addPeriodicTimer(0.50, function() use($pusher){
        $pusher->load();
    });

    $webSock = new React\Socket\Server($loop);

    if ($loop instanceof \React\EventLoop\LibEventLoop) {
        echo "\n HAS LibEvent";
    }

    $webSock->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect
    $webServer = new Ratchet\Server\IoServer(
            new Ratchet\Http\HttpServer(
                    new Ratchet\WebSocket\WsServer($pusher)
            ),
            $webSock
    );

    $loop->run();

这一切都很好。

/src/MyApp/Pusher.php 该类将数据推送给所有连接的用户。

<?php
namespace MyApp;
use Ratchet\ConnectionInterface;
use Ratchet\MessageComponentInterface;

class AsyncThread extends \Thread{

    public $client;

    public function __construct($client){
        $this->client = $client;
    }

    public function run(){

        // do work on $this->client
        $user = mysql::assoc('SELECT * from users WHERE connection_id = "'.$this->client->resourceId.'"');
        // etc..
        $this->client->send(json_encode(array('foo'=>'bar')));

    }

}

class Pusher implements MessageComponentInterface{

    public static $clients = array();

    #load
    public static function load(){

        $client_count = count(self::$clients);

        echo "\n\n\n".'Serving to '.$client_count.' clients. '.time();

        $start = $istart = microtime(true);

        if( !count(self::$clients) ){
            if( !mysql_ping() ){
                $game->connect();
            }
        }

        $threads = array();
        foreach( self::$clients as $key => $client ){       

            // HANDLE CLIENT

            // This works just fine, the only problem is that if I have lets say 50 simultaneous users, the people near the end of the clients array will have to wait till the other users have been processed. This is not desirable
            $client->send(json_encode('foo'=>'bar'));

           // So I tried this:
           $threads[$key] = new AsyncThread($client);
           $threads[$key]->start();

           // At this point the AsyncThread class will throw a fatal error complaining about not being able to serialize a closure. 
          // If I dont set "$this->data = $client;" in the thread constructor no error appears but now I cant use the data.

           // Also regardless of whether or not I bind the data in the AsyncThread constructor,
           // the connection disappears if I call "new AsyncThread($client)". I cannot explain this behavior.

        }

    }

    public function onMessage(ConnectionInterface $from, $msg) {
        global $game;
        if( $msg ){
            $data = json_decode($msg);
            if( $data ){    

                switch( $data->task ){

                    #connect
                    case 'connect':
                        echo "\n".'New connection! ('.$from->resourceId.') '.$from->remoteAddress;
                        self::$clients[] = $from;
                        break;

                    default:
                        self::closeConnection($from);
                        echo "\nNO TASK CLOSING";
                        break;

                }
            }else{
                echo "\n NO DATA";
                self::closeConnection($from);
            }
        }else{
            echo "\n NO MSG";
            self::closeConnection($from);
        }
    }

    public function closeConnection($conn){
        global $game;
        if( $conn ){
            if( $conn->resourceId ){
                $connid = $conn->resourceId;
                $conn->close(); 
                $new = array();
                foreach( self::$clients as $client ){
                    if( $client->resourceId != $connid ){
                        $new[] = $client;
                    }
                }
                self::$clients = $new;
                $game->query('UPDATE users set connection_id = 0 WHERE connection_id = "'.intval($connid).'" LIMIT 1');
                echo "\n".'Connection '.$connid.' has disconnected';
            }
        }
    }

    public function onClose(ConnectionInterface $conn) {
        echo "\nCLIENT DROPPED";
        self::closeConnection($conn);
    }

    public function onOpen(ConnectionInterface $conn) {
    }
    public function onError(ConnectionInterface $conn, \Exception $e) {
        echo "\nCLIENT ERRORED";
        self::closeConnection($conn);
    }
    public function onSubscribe(ConnectionInterface $conn, $topic) {
    }
    public function onUnSubscribe(ConnectionInterface $conn, $topic) {
    }
    public function onCall(ConnectionInterface $conn, $id, $topic, array $params) {
    }
    public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
    }

}

只要我不在事件循环中创建线程,这一切都可以正常工作。

我是不是走错了路,还是 php 多线程和 websockets 不兼容?

【问题讨论】:

  • 我认为没有必要实现多线程,如果您通过棘轮和反应的源代码,那么您将了解它使用的是无阻塞套接字读取功能。此外,如果您想要少量的性能提升,那么您可能需要研究 libevent。
  • 我使用 libevent。我认为按照您在我的应用程序中说明的方式使用 React 的唯一方法是为每个连接创建一个事件循环。这听起来很乱,但我会搞砸的。
  • 有关于这种方法的消息吗?我需要同时向不同的 Web 套接字客户端连续写入(因为数据是为每个特定的 ip:port 对连续生成的),而 Ratchet 在这种情况下不能很好地工作(它基本上等待你结束 onMessage 将数据返回到客户端,因此无法处理任何其他消息)。
  • 还没人,希望我能弄明白。
  • 霍布斯你找到解决办法了吗?连续几天面临序列化问题

标签: php multithreading websocket pthreads ratchet


【解决方案1】:

检查这个包https://github.com/huyanping/react-multi-process

安装

composer 需要 jenner/react-multi-process 怎么用?

这么简单:

$loop = React\EventLoop\Factory::create();
$server = stream_socket_server('tcp://127.0.0.1:4020');
stream_set_blocking($server, 0);
$loop->addReadStream($server, function ($server) use ($loop) {
    $conn = stream_socket_accept($server);
    $data = "pid:" . getmypid() . PHP_EOL;
    $loop->addWriteStream($conn, function ($conn) use (&$data, $loop) {
        $written = fwrite($conn, $data);
        if ($written === strlen($data)) {
            fclose($conn);
            $loop->removeStream($conn);
        } else {
            $data = substr($data, 0, $written);
        }
    });
});

// the second param is the sub process count
$master = new \React\Multi\Master($loop, 20);
$master->start();

使用 jenner/simple_fork 的示例:

class IoServer {
     /**
     * @param int $count worker process count
     * Run the application by entering the event loop
     * @throws \RuntimeException If a loop was not previously specified
     */
    public function run($count = 1) {
        if (null === $this->loop) {
            throw new \RuntimeException("A React Loop was not provided during instantiation");
        }

        if($count <= 1){
            $this->loop->run();
        }else{
            $loop = $this->loop;
            $master = new \Jenner\SimpleFork\FixedPool(function() use($loop) {
                $this->loop->run();
            }, $count);
            $master->start();
            $master->keep(true);
//            or just 
//            $master = new \React\Multi\Master($this->loop, $count);
//            $master->start();
        }
    }
}

【讨论】:

    猜你喜欢
    • 2015-01-15
    • 2021-09-16
    • 2016-06-09
    • 1970-01-01
    • 2016-08-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多