【问题标题】:RabbitMQBundle does not acknowledge message when producing message from consumerRabbitMQBundle 在从消费者生成消息时不确认消息
【发布时间】:2016-03-14 23:18:44
【问题描述】:

如何在确认消息的同时产生新消息?

当我从命令行启动消费者时,消息将保留在原始队列中。但新队列将在新队列中创建,无限循环。因为它一直在消耗未被确认的消息。

即使在消费者的 execute() 函数中返回 TRUE。应该承认它,就像它在文档中所说的那样。

我正在从消费者内部的回调中生成消息。这个生产者是使用标准 Symfony DI 注入的。

如果我删除发布新消息的方法,则消息被确认就好了...

services.yml

services:
  my_importlog_repository:
    class: Doctrine\ORM\EntityRepository
    factory_service: doctrine.orm.default_entity_manager
    factory_method: getRepository
    arguments: [AppBundle\Entity\MyImportlogEntity]
  my_distributor:
    class: AppBundle\DistributorImport\MyDistributor
    arguments: [@my_importlog_repository,@logger,@old_sound_rabbit_mq.my_download_producer, %my_config%]
  my_download:
    class: AppBundle\Consumer\MyDownloadConsumer
    arguments: [@logger,@old_sound_rabbit_mq.my_extract_producer,@my_distributor,%my_config%]
  my_extract:
    class: AppBundle\Consumer\MyExtractConsumer
    arguments: [@logger,@old_sound_rabbit_mq.my_convert_producer,@my_distributor,%my_config%]

config.yml

# rabbitmq
old_sound_rabbit_mq:
  connections:
    default:
      host:     '192.168.99.100'
      port:     5672
      user:     'guest'
      password: 'guest'
      vhost:    '/'
      lazy:     false
      connection_timeout: 60
      read_write_timeout: 60

      # requires php-amqplib v2.4.1+ and PHP5.4+
      keepalive: false

      # requires php-amqplib v2.4.1+
      heartbeat: 30
  producers:
    # my producers
    my_download:
      connection:       default
      exchange_options: {name: 'distributor_import', type: direct}
      queue_options:    {name: 'my_download'}
    my_extract:
      connection:       default
      exchange_options: {name: 'distributor_import', type: direct}
      queue_options:    {name: 'my_extract'}
    my_convert:
      connection:       default
      exchange_options: {name: 'distributor_import', type: direct}
      queue_options:    {name: 'my_convert'}
  consumers:
    # my consumers
    my_download:
      connection:       default
      exchange_options: {name: 'distributor_import', type: direct}
      queue_options:    {name: 'my_download'}
      callback:         my_download
      qos_options:      {prefetch_size: 0, prefetch_count: 1, global: false}
      idle_timeout:     60
    my_extract:
      connection:       default
      exchange_options: {name: 'distributor_import', type: direct}
      queue_options:    {name: 'my_extract'}
      callback:         my_extract
      qos_options:      {prefetch_size: 0, prefetch_count: 1, global: false}
      idle_timeout:     60

MyDownloadConsumer.php

<?php

namespace AppBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;

class MyDownloadConsumer implements ConsumerInterface
{
  private $logger;
  private $producer;
  private $distributor;
  private $config;

  public function __construct(\Symfony\Component\HttpKernel\Log\LoggerInterface $logger, \OldSound\RabbitMqBundle\RabbitMq\Producer $producer, \AppBundle\DistributorImport\MyDistributor $distributor, Array $config)
  {
    $this->logger = $logger;
    $this->producer = $producer;
    $this->distributor = $distributor;
    $this->config = $config;
  }

  public function execute(\PhpAmqpLib\Message\AMQPMessage $message)
  {
    $data = unserialize($message->body);
    $this->producer->publish(serialize($data));
    return true;
  }
}

如果我删除

$data = unserialize($message->body);
$this->producer->publish(serialize($data));

它应该像它应该的那样工作......

【问题讨论】:

  • 如果你想重新排队消息,最后返回 false ;-)
  • @mmmm 这不是我想做的。我想向同一个队列发送一条新消息。或者另一个队列。但结果还是一样。原始消息未得到确认。

标签: php symfony rabbitmq message-queue


【解决方案1】:

能够从我的 Consumer execute() 方法中发布消息,同时还确认当前正在使用的消息。使用以下代码。

$message->delivery_info['channel']
    ->basic_publish(
        new AMQPMessage (serialize($data)),
        'name_of_my_exchange',
        'key.of.my.routing'
    );

直接在正在使用的消息的通道上发布。

【讨论】:

  • 这里只是评论说使用这种方法发送的消息不再持久......
猜你喜欢
  • 1970-01-01
  • 2020-03-09
  • 2013-08-01
  • 1970-01-01
  • 2016-11-11
  • 1970-01-01
  • 2018-02-27
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多