【问题标题】:How to detect dead RabbitMQ connection?如何检测死的 RabbitMQ 连接?
【发布时间】:2017-02-01 23:43:23
【问题描述】:

我在 Go 中有一个 RabbitMQ 消费者脚本。这是来自RabbitMQ tutorial 的一个简单脚本,它使用streadway/amqp 库。

问题是如果RabbitMQ服务器停止,消费者脚本不会退出;并且当 RabbitMQ 服务器重启时,消费者不再接收消息。

有没有办法检测消费者连接已死并重新连接,或者至少终止消费者脚本?

我知道库设置了默认的 10 秒。连接的心跳间隔;有可能以某种方式使用它吗?

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "test_task_queue", // name
        true,         // durable
        false,        // delete when unused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            d.Ack(false)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t * time.Second)
            log.Printf("Done")
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

【问题讨论】:

    标签: go rabbitmq amqp


    【解决方案1】:

    amqp.Connection 有方法NotifyClose(),它返回通道信号传输或协议错误。 所以像

    for {  //reconnection loop
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") //setup
        notify := conn.NotifyClose(make(chan *amqp.Error)) //error channel
    ...
        ch, err := conn.Channel()
        msgs, err := ch.Consume(
    ...
        for{  //receive loop
            select {  //check connection
                case err = <-notify:
                //work with error
                break //reconnect
            case d = <- msgs:
                //work with message
            ...
            }
        }
    }
    

    【讨论】:

    • 需要注意的是,如果有人在连接上调用 Close,NotifyClose 将返回 nil。这允许区分错误(可能需要重新连接)和正常的应用程序终止。
    【解决方案2】:

    有几种方法可以做到这一点:检查传递通道是否关闭或使用Channel.NotifyClose

    检查投放渠道

    启动consumer后,你会从delivery channel收到。如您所知,receive 操作可能采用特殊形式 x, ok := &lt;-ch,其中 ok 为 false,而 x 的值为零由于通道被关闭(并且为空):

    conn, _ := amqp.Dial(url)
    ch, _ := conn.Channel()
    
    delivery, _ := ch.Consume(
            queueName,
            consumerName,
            true,  // auto ack
            false, // exclusive
            false, // no local
            true,  // no wait,
            nil,   // table
        )
    
    for {
        payload, ok := <- delivery
        if !ok {
            // ... channel closed
            return
        }
    }
    

    这是可行的,因为当 AMQP 通道关闭或发生错误时,Go 通道 &lt;-chan amqp.Delivery 将被关闭:

    [It] 继续向返回的 chan Delivery 传递,直到 Channel.Cancel、Connection.Close、Channel.Close 或发生 AMQP 异常。

    使用Channel.NotifyClose

    这很简单。而且原理是一样的:

    NotifyClose 为服务器以 Connection.Close 或 Channel.Close 方法的形式发送通道或连接异常时注册一个监听器。

    NotifyClose 返回的频道与您作为参数传递的频道相同;该方法只在内部注册它,所以你可以这样做:

    errC := ch.NotifyClose(make(chan *amqp.Error, n))
    

    其中n 是一个非零缓冲区大小。 确保将缓冲通道传递给NotifyClose,否则,根据您的代码结构,库可能会在发送时阻塞。

    然后您可以在errC 频道上接收并根据您收到的错误类型采取措施。简而言之,错误可能是:

    • 连接错误,通常无法恢复
    • 通道错误,也称为软异常,通常可以通过重置连接来恢复
    • nil 如果程序故意调用conn.Close()

    要知道错误是否可恢复,您可以检查amqp.ErrorCode 字段和/或Recover 字段,在出现软异常时设置为true。

    以下函数显示了如何区分错误代码 - 这是作为额外的见解提供的。对于一般情况,只需检查Error.Recover

    const (
        ConnectionError = 1
        ChannelError    = 2
    )
    
    func isConnectionError(err *amqp.Error) bool {
        return errorType(err.Code) == ConnectionError
    }
    
    func isChannelError(err *amqp.Error) bool {
        return errorType(err.Code) == ChannelError
    }
    
    func errorType(code int) int {
        switch code {
        case
            amqp.ContentTooLarge,    // 311
            amqp.NoConsumers,        // 313
            amqp.AccessRefused,      // 403
            amqp.NotFound,           // 404
            amqp.ResourceLocked,     // 405
            amqp.PreconditionFailed: // 406
            return ChannelError
    
        case
            amqp.ConnectionForced, // 320
            amqp.InvalidPath,      // 402
            amqp.FrameError,       // 501
            amqp.SyntaxError,      // 502
            amqp.CommandInvalid,   // 503
            amqp.ChannelError,     // 504
            amqp.UnexpectedFrame,  // 505
            amqp.ResourceError,    // 506
            amqp.NotAllowed,       // 530
            amqp.NotImplemented,   // 540
            amqp.InternalError:    // 541
            fallthrough
    
        default:
            return ConnectionError
        }
    }
    

    【讨论】:

      【解决方案3】:

      没有发现go-amqp库实现了连接池的断连重连功能。
      github上有基于Amqp二次打包的开源代码。 支持断线重连和异常重连。代码使用起来也比较简单,每个服务都有连接和通道。

      Source Code here

      示例代码:

      package main
       
      import (
          "go-rabbit/rabbit"
      )
       
      /*
          support isconnection and reconnection function
          And Failure re-send function
          @author : Bill
      */
      func main() {
          var(
              addr = "amqp://guest:guest@localhost:5672/"
              queue = "testQueue"
              exchange = "test_exchange"
              routerKey = "/test"
              msg = "test1!"
       
              //delay
              delayQueue = "delay_queue"
              delayExchange = "delay_exchange"
              delayRouterKey = "delay_exchange"
              prefix = "v1_prefix"
              sep = "_"
              eType = "F"
              _ttl = 60 * 1000
          )
       
          var rabbitProduct1 = rabbit.NewRabbitProduct(addr,_ttl,prefix,sep,delayExchange,delayQueue,delayRouterKey)
          // register recycle
          go rabbitProduct1.InitDefdelay(false)
          go rabbitProduct1.InitDefdelay(true)
          go rabbitProduct1.RegisterDelayWithPreFix("delay_queue","delay_exchange","delay_exchange")
       
          // ttl is dead recycle time if ttl > 0 then recycle
          rabbitProduct1.PubMessage(true,eType,queue,exchange,routerKey,msg,rabbitProduct1.GetBool(1),rabbitProduct1.GetBool(0),_ttl)
       
      }
      

      希望它能帮助你或给你一些想法

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-06-14
        • 2014-06-28
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多