【问题标题】:How to delay? - php-amqplib怎么拖延? - php-amqplib
【发布时间】:2014-03-23 10:02:09
【问题描述】:

我想知道如何使用 Amqpphplib 进行延迟。

我使用了这个很棒的咖啡脚本教程:

https://github.com/jamescarr/rabbitmq-scheduled-delivery

但它似乎不适用于 PHP-amqplib。

消息如我所愿过期,但似乎“x-dead-letter-exchange”不起作用。我使用了 RabbitMQ 管理控制台,我可以实时看到所有队列的创建和删除。但是我的消息在过期后确实会进入即时队列。我用的是 RabbitMQ 3.2.3 版本,PHP-amqplib 2.2.* 版本。

这是我的代码:

连接类:

class Connection
{
/**
 * @var $ch
 */
public $ch;

/**
 * @var $consumer_tag
 */
public $consumer_tag;

/**
 * @var $exchange
 */
public $exchange;

/**
 * @var $conn
 */
public $conn;

public function __construct($host, $port, $user, $password, $vhost)
{

    $this->exchange = 'immediate';
    $this->queue = 'right.now.queue';
    $this->consumer_tag = 'consumer';


    $this->conn = new AMQPConnection($host, $port, $user, $password, $vhost);
    $this->ch = $this->conn->channel();

    $this->ch->exchange_declare($this->exchange, 'direct', false, true, false);

    $this->ch->queue_declare($this->queue, false, true, false, false, false);

    $this->ch->queue_bind($this->queue, $this->exchange);


}

public function createDelayedQueue ($name, $delay_seconds) {
    $this->ch->queue_declare($name, false, false, false, true, true, array(
        "x-dead-letter-exchange" => array("S", $this->exchange),
        "x-message-ttl" => array("I", $delay_seconds*1000),
        "x-expires" => array("I", $delay_seconds*1000+1000)
    ));
}
}

发布代码

$name = 'send.later.'.$ts;
$amqp->createDelayedQueue($name, 2);
$msg = new AMQPMessage($msg_body, array('content_type' => 'text/plain', 'delivery_mode' => 2));
$amqp->ch->basic_publish($msg);

消费者代码

$amqp = $this->getContainer()->get('amqp_connexion');

    $amqp->ch->basic_consume($amqp->queue, $amqp->consumer_tag, false, false, false, false, function ($msg) {

        echo $msg->body;
        echo "\n--------\n";
    });

    $output->writeln('Listening '.$amqp->queue.'...');

    // Loop as long as the channel has callbacks registered
    while (count($amqp->ch->callbacks)) {
        $amqp->ch->wait();
    }

【问题讨论】:

    标签: php rabbitmq amqp php-amqp


    【解决方案1】:

    我刚刚为php写了一个简化的工作版本:

    /////// simplified ///////
    
    // include the AMQPlib Classes || use an autoloader
    
    // queue/exchange names
    $queueRightNow = 'right.now.queue';
    $exchangeRightNow = 'right.now.exchange';
    $queueDelayed5sec = 'delayed.five.seconds.queue';
    $exchangeDelayed5sec = 'delayed.five.seconds.exchange';
    
    $delay = 5; // delay in seconds
    
    // create connection
    $AMQPConnection = new \PhpAmqpLib\Connection\AMQPConnection('localhost',5672,'guest','guest');
    
    // create a channel
    $channel = $AMQPConnection->channel();
    
    // create the right.now.queue, the exchange for that queue and bind them together
    $channel->queue_declare($queueRightNow);
    $channel->exchange_declare($exchangeRightNow, 'direct');
    $channel->queue_bind($queueRightNow, $exchangeRightNow);
    
    // now create the delayed queue and the exchange
    $channel->queue_declare(
            $queueDelayed5sec,
            false,
            false,
            false,
            true,
            true,
            array(
                'x-message-ttl' => array('I', $delay*1000),   // delay in seconds to milliseconds
                "x-expires" => array("I", $delay*1000+1000),
                'x-dead-letter-exchange' => array('S', $exchangeRightNow) // after message expiration in delay queue, move message to the right.now.queue
            )
    );
    $channel->exchange_declare($exchangeDelayed5sec, 'direct');
    $channel->queue_bind($queueDelayed5sec, $exchangeDelayed5sec);
    
    // now create a message und publish it to the delayed exchange
    $msg = new \PhpAmqpLib\Message\AMQPMessage(
        time(),
        array(
            'delivery_mode' => 2
        )
    );
    $channel->basic_publish($msg,$exchangeDelayed5sec);
    
    
    // consume the delayed message
    $consumeCallback = function(\PhpAmqpLib\Message\AMQPMessage $msg) {
        $messagePublishedAt = $msg->body;
        echo 'seconds between publishing and consuming: '
            . (time()-$messagePublishedAt) . PHP_EOL;
    };
    $channel->basic_consume($queueRightNow, '', false, true, false, false, $consumeCallback);
    
    // start consuming
    while (count($channel->callbacks) > 0) {
        $channel->wait();
    }
    

    【讨论】:

      【解决方案2】:

      如果您选择基于amqp interop 的传输方式,则根本不需要深入研究细节。只需要做几件事:

      安装enqueue/amqp-lib(顺便说一句,您可以使用基于 amqp ext 和一个很棒的 bunny lib 的其他传输)传输和enqueue/amqp-tools

      composer require enqueue/amqp-lib enqueue/amqp-tools
      

      创建 amqp 上下文,添加延迟策略并发送延迟消息:

      <?php
      use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
      use Enqueue\AmqpBunny\AmqpConnectionFactory;
      
      $context = (new AmqpConnectionFactory('amqp://'))->createContext();
      $context->setDelayStrategy(new RabbitMqDlxDelayStrategy())
      
      $queue = $context->createQueue('foo');
      $context->declareQueue($queue);
      
      $message = $context->createMessage('Hello world!');
      
      $context->createProducer()
          ->setDeliveryDelay(5000) // 5 sec
          ->send($queue, $message)
      ;
      

      顺便说一句,这不是唯一可用的策略。有一个基于 RabbitMQ 延迟插件。它可以以同样的方式使用。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2018-11-21
        • 2018-10-18
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-07-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多