【问题标题】:Symfony + RabbitMQ + RatchetSymfony + RabbitMQ + Ratchet
【发布时间】:2021-12-07 17:20:00
【问题描述】:

我已经按照本手册设置了 websocket: http://socketo.me/docs/hello-world 它的工作。

但现在我需要从 php 向客户端发送消息,我不知道该怎么做。 我找到了一些我在项目中使用的 rabbitMQ 手册?像这样: https://github.com/ratchetphp/Ratchet/issues/659 但我不明白如何使用它。

也许有人知道?

【问题讨论】:

    标签: symfony rabbitmq ratchet


    【解决方案1】:

    您可以创建Symfony console command 来启动 RabbitMQ 消费者,启动 WebSocket 服务器并在从队列接收到消息后向客户端发送消息。

     protected function execute(InputInterface $input, OutputInterface $output): int
        {
            $loop = LoopFactory::create();
            $pusher = new MessageHandler();
    
            $queueAsyncClient = new AsyncClient($loop, [
                "host" => $_ENV['RABBITMQ_HOST'],
                "port" => $_ENV['RABBITMQ_PORT'],
                "vhost" => $_ENV['RABBITMQ_VHOST'],
                "user" => $_ENV['RABBITMQ_USERNAME'],
                "password" => $_ENV['RABBITMQ_PASSWORD'],
            ]);
    
            $connect = $queueAsyncClient->connect();
    
            // When client has connected, retrieve channel (as promise)
            $connect->then(function (AsyncClient $client) {
                return $client->channel();
                // Then declare the queue and exchange
            })->then(function (Channel $channel) {
                // These method calls all return promises, so we need to combine them
                return \React\Promise\all([
                    $channel,
    
                    // Create the queue we'll be using
                    $channel->queueDeclare($_ENV['RABBITMQ_QUEUE'], false, true),
    
                    // Declare an exchange
                    $channel->exchangeDeclare($_ENV['RABBITMQ_EXCHANGE'], 'direct', false, true),
    
                    // Bind the queue to the exchange
                    $channel->queueBind($_ENV['RABBITMQ_QUEUE'], $_ENV['RABBITMQ_EXCHANGE']),
                ]);
    
                // Then, when the exchange is all hooked up, hook up the pusher
            })->then(function ($connection) use ($pusher) {
                /** @var Channel $channel (see first section of all() promise above) */
                $channel = $connection[0];
    
                // On messages, consume them using the pusher
                return $channel->consume(
                    function (Message $message) use ($pusher, $channel) {
                        $content = json_decode($message->content, true);
                        $connections = $pusher->getUserConnections($content["uid"]);
                        foreach ($connections as $connection) {
                            $connection->send(json_encode($content["event"]));
                        }
                    },
                    $_ENV['RABBITMQ_QUEUE'],
                    '',
                    false,
                    true // Acknowledges messages
                );
            })->done();
    
            $webSocketServer = new \React\Socket\TcpServer("tcp://0.0.0.0:" . $_ENV['WEBSOCKET_PORT'] . "/ws", $loop);
    
    
            $wsServer = new WsServer($pusher);
            $wsServer->enableKeepAlive($loop, 30);
    
    
            $ioServer = new IoServer(
                new HttpServer($wsServer),
                $webSocketServer
            );
    
            $loop->run();
        }
    

    MessageHandler 类(如 Github 问题中的 Pusher)基本上存储所有连接并处理连接打开/关闭和消息事件。

    class MessageHandler implements MessageComponentInterface
    {
        protected $connections;
    
        public function __construct()
        {
            $this->connections = new SplObjectStorage;
        }
    
        public function onOpen(ConnectionInterface $conn)
        {
            $this->connections->attach($conn);
        }
    
        public function onMessage(ConnectionInterface $from, $msg)
        {
            try {
                $content = json_decode($msg, true);
                if ($content || !isset($content["uid"])) {
                    $this->connections->rewind();
                    while ($this->connections->valid()) {
                        if ($this->connections->current() === $from) {
                            $this->connections->setInfo($content["uid"]);
                        }
                        $this->connections->next();
                    }
                } else {
                    echo "Invalid message content: " . $msg;
                }
            } catch (\Throwable $t) {
               echo $t->getMessage();
            }
        }
    
        public function onClose(ConnectionInterface $conn)
        {
            $this->connections->detach($conn);
        }
    
        public function onError(ConnectionInterface $conn, Exception $e)
        {
            $this->connections->detach($conn);
            $conn->close();
        }
    
        /**
         * @param string|null $uid
         * @return ConnectionInterface[]|null
         */
        public function getUserConnections($uid)
        {
            $connections = [];
            $this->connections->rewind();
            while ($this->connections->valid()) {
                if ($this->connections->getInfo() == $uid) {
                    $connections[] = $this->connections->current();
                }
                $this->connections->next();
            }
            return $connections;
        }
    
        /**
         * @return ConnectionInterface[]|null
         */
        public function allConnections()
        {
            $connections = [];
            $this->connections->rewind();
            while ($this->connections->valid()) {
                $connections[] = $this->connections->current();
                $this->connections->next();
            }
            return $connections;
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2013-07-22
      • 1970-01-01
      • 1970-01-01
      • 2023-03-04
      • 1970-01-01
      • 1970-01-01
      • 2013-07-07
      • 1970-01-01
      • 2020-03-14
      相关资源
      最近更新 更多