【发布时间】:2021-12-07 17:20:00
【问题描述】:
我已经按照本手册设置了 websocket: http://socketo.me/docs/hello-world 它的工作。
但现在我需要从 php 向客户端发送消息,我不知道该怎么做。 我找到了一些我在项目中使用的 rabbitMQ 手册?像这样: https://github.com/ratchetphp/Ratchet/issues/659 但我不明白如何使用它。
也许有人知道?
【问题讨论】:
我已经按照本手册设置了 websocket: http://socketo.me/docs/hello-world 它的工作。
但现在我需要从 php 向客户端发送消息,我不知道该怎么做。 我找到了一些我在项目中使用的 rabbitMQ 手册?像这样: https://github.com/ratchetphp/Ratchet/issues/659 但我不明白如何使用它。
也许有人知道?
【问题讨论】:
您可以创建Symfony console command 来启动 RabbitMQ 消费者,启动 WebSocket 服务器并在从队列接收到消息后向客户端发送消息。
protected function execute(InputInterface $input, OutputInterface $output): int
{
$loop = LoopFactory::create();
$pusher = new MessageHandler();
$queueAsyncClient = new AsyncClient($loop, [
"host" => $_ENV['RABBITMQ_HOST'],
"port" => $_ENV['RABBITMQ_PORT'],
"vhost" => $_ENV['RABBITMQ_VHOST'],
"user" => $_ENV['RABBITMQ_USERNAME'],
"password" => $_ENV['RABBITMQ_PASSWORD'],
]);
$connect = $queueAsyncClient->connect();
// When client has connected, retrieve channel (as promise)
$connect->then(function (AsyncClient $client) {
return $client->channel();
// Then declare the queue and exchange
})->then(function (Channel $channel) {
// These method calls all return promises, so we need to combine them
return \React\Promise\all([
$channel,
// Create the queue we'll be using
$channel->queueDeclare($_ENV['RABBITMQ_QUEUE'], false, true),
// Declare an exchange
$channel->exchangeDeclare($_ENV['RABBITMQ_EXCHANGE'], 'direct', false, true),
// Bind the queue to the exchange
$channel->queueBind($_ENV['RABBITMQ_QUEUE'], $_ENV['RABBITMQ_EXCHANGE']),
]);
// Then, when the exchange is all hooked up, hook up the pusher
})->then(function ($connection) use ($pusher) {
/** @var Channel $channel (see first section of all() promise above) */
$channel = $connection[0];
// On messages, consume them using the pusher
return $channel->consume(
function (Message $message) use ($pusher, $channel) {
$content = json_decode($message->content, true);
$connections = $pusher->getUserConnections($content["uid"]);
foreach ($connections as $connection) {
$connection->send(json_encode($content["event"]));
}
},
$_ENV['RABBITMQ_QUEUE'],
'',
false,
true // Acknowledges messages
);
})->done();
$webSocketServer = new \React\Socket\TcpServer("tcp://0.0.0.0:" . $_ENV['WEBSOCKET_PORT'] . "/ws", $loop);
$wsServer = new WsServer($pusher);
$wsServer->enableKeepAlive($loop, 30);
$ioServer = new IoServer(
new HttpServer($wsServer),
$webSocketServer
);
$loop->run();
}
MessageHandler 类(如 Github 问题中的 Pusher)基本上存储所有连接并处理连接打开/关闭和消息事件。
class MessageHandler implements MessageComponentInterface
{
protected $connections;
public function __construct()
{
$this->connections = new SplObjectStorage;
}
public function onOpen(ConnectionInterface $conn)
{
$this->connections->attach($conn);
}
public function onMessage(ConnectionInterface $from, $msg)
{
try {
$content = json_decode($msg, true);
if ($content || !isset($content["uid"])) {
$this->connections->rewind();
while ($this->connections->valid()) {
if ($this->connections->current() === $from) {
$this->connections->setInfo($content["uid"]);
}
$this->connections->next();
}
} else {
echo "Invalid message content: " . $msg;
}
} catch (\Throwable $t) {
echo $t->getMessage();
}
}
public function onClose(ConnectionInterface $conn)
{
$this->connections->detach($conn);
}
public function onError(ConnectionInterface $conn, Exception $e)
{
$this->connections->detach($conn);
$conn->close();
}
/**
* @param string|null $uid
* @return ConnectionInterface[]|null
*/
public function getUserConnections($uid)
{
$connections = [];
$this->connections->rewind();
while ($this->connections->valid()) {
if ($this->connections->getInfo() == $uid) {
$connections[] = $this->connections->current();
}
$this->connections->next();
}
return $connections;
}
/**
* @return ConnectionInterface[]|null
*/
public function allConnections()
{
$connections = [];
$this->connections->rewind();
while ($this->connections->valid()) {
$connections[] = $this->connections->current();
$this->connections->next();
}
return $connections;
}
}
【讨论】: