2020年10月19日15:57:24

 个人一点学习和使用rabbitmq,先理解其中概念,不然使用起来十分混乱

php使用rabbitmq的相关博客还是相对较少的,java的偏多一些,我也是参考一些java博客才算是搞清楚

环境php7.3 laravel 8.0 一部分原因也是测试一下 laravel 8.0的改变

安装参考

composer require php-amqplib/php-amqplib

https://www.cnblogs.com/zx-admin/p/13825182.html

先贴代码

BaseRabbitmqService

<?php

namespace App\Service;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;

class BaseRabbitmqService {

    //死信队列和交换机
    public static $dlxQueue = 'dlx.queue';
    public static $dlxExchange = 'dlx.exchange';
    public static $dlxKey = 'dlxKey';
    //死信之后的队列和交换机
    public static $normalQueue = 'normal.queue';
    public static $normalExchange = 'normal.exchange';
    public static $normalKey = 'normalKey';
    //消息发布者的routing_key
    public static $msgKey = 'msgkey';

    private static function getConfig() {
        $isOnline = config('system.is_online');
        if ($isOnline) {
            return config('system.online');
        } else {
            return config('system.offline');
        }
    }

    public static function getConnection() {
        $config = self::getConfig();

        $connection = new AMQPStreamConnection($config['host'], $config['port'], $config['name'], $config['password']);
        self::init($connection);
        return $connection;
    }

    //初始化一些队列信息
    private static function init(&$connection) {
        $channel = $connection->channel();

        //定义交换机
        $channel->exchange_declare(self::$dlxExchange, AMQPExchangeType::DIRECT, false, true);
        $channel->exchange_declare(self::$normalExchange, AMQPExchangeType::FANOUT, false, true);

        //定义队列,在正常队列超时之后就送去死信队列
        $args = new AMQPTable();
        // 消息过期方式:设置 queue.normal 队列中的消息5s之后过期,毫秒单位
        $args->set('x-message-ttl', 5000);
        // 设置队列最大长度方式: x-max-length
        //$args->set('x-max-length', 1);
        $args->set('x-dead-letter-exchange', self::$dlxExchange);
        $args->set('x-dead-letter-routing-key', self::$msgKey);
        $channel->queue_declare(self::$normalQueue, false, true, false, false, false, $args);
        $channel->queue_declare(self::$dlxQueue, false, true, false, false);

        $channel->queue_bind(self::$normalQueue, self::$normalExchange);
        $channel->queue_bind(self::$dlxQueue, self::$dlxExchange, self::$msgKey);
    }

}
View Code

相关文章: