【问题标题】:How to disconnect from RabbitMQ properly using Perl's AnyEvent::RabbitMQ?如何使用 Perl 的 AnyEvent::RabbitMQ 正确断开与 RabbitMQ 的连接?
【发布时间】:2016-04-19 12:58:58
【问题描述】:

我想以适当的方式断开与 RabbitMQ 的连接。通过 Perl 的 AnyEvent::RabbitMQ(我正在使用)的 reviewing the source code,我发现了 close 方法,它似乎关闭了所有对 RabbitMQ 开放的通道。

所以我

  1. 连接到 RabbitMQ
  2. 打开了一个频道
  3. 宣布交换
  4. 绑定到该交易所
  5. 声明了一个队列
  6. 绑定到该队列
  7. AnyEvent::RabbitMQ 实例(不是::Channel 实例)上执行close 方法

连接似乎已关闭,但 RabbitMQ 日志显示“AMQP 连接”为“connection_closed_abruptly”。

这是该连接的完整 RabbitMQ 日志:

=INFO REPORT==== 14-Jan-2016::10:02:15 ===
accepting AMQP connection <0.10868.0> (127.0.0.1:57764 -> 127.0.0.1:5672)

=WARNING REPORT==== 14-Jan-2016::10:02:16 ===
closing AMQP connection <0.10868.0> (127.0.0.1:57764 -> 127.0.0.1:5672):
connection_closed_abruptly

这里是示例代码:

#!/usr/bin/perl
use strictures 1;

use AnyEvent::RabbitMQ;
use Data::Printer;

my ( $rabbitmq, $rabbitmq_channel );

my $condvar = AnyEvent->condvar;

$rabbitmq = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(
  host       => '127.0.0.1',
  port       => 5672,
  user       => 'guest',
  pass       => 'guest',
  vhost      => '/',
  timeout    => 1,
  tls        => 0,
  tune       => { heartbeat => 1 },
  on_success => sub {
    ($rabbitmq) = @_;
    $rabbitmq->open_channel(
      on_success => sub {
        ($rabbitmq_channel) = @_;
        $rabbitmq_channel->confirm;
        $rabbitmq_channel->declare_exchange(
          exchange   => 'test_exchange',
          type       => 'fanout',
          on_success => sub {
            $rabbitmq_channel->bind_exchange(
              source      => 'test_exchange',
              destination => 'test_exchange',
              routing_key => '',
              on_success  => sub {
                $rabbitmq_channel->declare_queue(
                  queue      => 'test_queue',
                  on_success => sub {
                    $rabbitmq_channel->bind_queue(
                      queue       => 'test_queue',
                      exchange    => 'test_exchange',
                      routing_key => '',
                      on_success  => sub {
                        $rabbitmq->close;
                        undef $rabbitmq;
                      },
                      on_failure => sub { $condvar->send( __LINE__, @_ ) },
                    );
                  },
                  on_failure => sub { $condvar->send( __LINE__, @_ ) },
                );
              },
              on_failure => sub { $condvar->send( __LINE__, @_ ) },
            );
          },
          on_failure => sub { $condvar->send( __LINE__, @_ ) },
        );
      },
      on_failure => sub { $condvar->send( __LINE__, @_ ) },
      on_return  => sub { $condvar->send( __LINE__, @_ ) },
      on_close   => sub { $condvar->send( __LINE__, @_ ) },
    );
  },
  on_failure      => sub { $condvar->send( __LINE__, @_ ) },
  on_read_failure => sub { $condvar->send( __LINE__, @_ ) },
  on_return       => sub { $condvar->send( __LINE__, @_ ) },
  on_close        => sub { $condvar->send( __LINE__, @_ ) },
);

my $reason = [ $condvar->recv ];
p $reason;

如何使用 Perl 的 AnyEvent::RabbitMQ 正确断开与 RabbitMQ 的连接?

【问题讨论】:

    标签: perl rabbitmq message-queue anyevent


    【解决方案1】:

    有参考周期的指标。这些可以防止结构被正确破坏。

    1. my $rabbitmq; $rabbitmq = ...”大喊引用循环的可能性。

    2. my $rabbitmq_channel; $rabbitmq_channel = ...”大喊引用循环的可能性。

    3. $rabbitmq_channel$rabbitmq 拥有(存储在),但它也被$rabbitmq_channel 的事件处理程序捕获。

    标记为&lt;=== 的更改替换了不可接受的代码。

    标记为&lt;--- 的更改可能是必要的。如果$rabbitmq_channel 在回调中未定义,请移除此更改。

    use Scalar::Util qw( weaken );
    
    my $done_cv = AnyEvent->condvar;
    
    my $rabbitmq = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(  # <===
      host       => '127.0.0.1',
      port       => 5672,
      user       => 'guest',
      pass       => 'guest',
      vhost      => '/',
      timeout    => 1,
      tls        => 0,
      tune       => { heartbeat => 1 },
      on_success => sub {
        my ($rabbitmq) = @_;  # <===
        $rabbitmq->open_channel(
          on_success => sub {
            my ($rabbitmq_channel) = @_;  # <===
            {  # <---
              my $rabbitmq_channel = weaken($rabbitmq_channel);  # <---
              $rabbitmq_channel->confirm;
              $rabbitmq_channel->declare_exchange(
                exchange   => 'test_exchange',
                type       => 'fanout',
                on_success => sub {
                  $rabbitmq_channel->bind_exchange(
                    source      => 'test_exchange',
                    destination => 'test_exchange',
                    routing_key => '',
                    on_success  => sub {
                      $rabbitmq_channel->declare_queue(
                        queue      => 'test_queue',
                        on_success => sub {
                          $rabbitmq_channel->bind_queue(
                            queue       => 'test_queue',
                            exchange    => 'test_exchange',
                            routing_key => '',
                            on_success  => sub { $done_cv->send( __LINE__, @_ ) },  # <===
                            on_failure => sub { $done_cv->send( __LINE__, @_ ) },
                          );
                        },
                        on_failure => sub { $done_cv->send( __LINE__, @_ ) },
                      );
                    },
                    on_failure => sub { $done_cv->send( __LINE__, @_ ) },
                  );
                },
                on_failure => sub { $done_cv->send( __LINE__, @_ ) },
              );
            }  # <---
          },
          on_failure => sub { $done_cv->send( __LINE__, @_ ) },
          on_return  => sub { $done_cv->send( __LINE__, @_ ) },
          on_close   => sub { $done_cv->send( __LINE__, @_ ) },
        );
      },
      on_failure      => sub { $done_cv->send( __LINE__, @_ ) },
      on_read_failure => sub { $done_cv->send( __LINE__, @_ ) },
      on_return       => sub { $done_cv->send( __LINE__, @_ ) },
      on_close        => sub { $done_cv->send( __LINE__, @_ ) },
    );
    
    my $reason = [ $done_cv->recv ];
    p $reason;
    

    我希望这会有所帮助。

    【讨论】:

      【解决方案2】:

      问题是 AnyEvent::RabbitMQ.pm 库本身的一个错误。我不确定如何修复sub close 本身,但关键部分是它从不执行在全局解构期间将Connection::CloseConnection::CloseOk 方法发送到您的RabbitMQ 服务器的代码。设置好 AMQP 连接后,您可以通过以下操作进行确认。

      $rabbitmq->_push_write(Net::AMQP::Protocol::Connection::Close->new());
      $rabbitmq->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
      

      这有点麻烦,所以我正在研究正确的方法,希望维护者能接受拉取请求。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-03-14
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多