【问题标题】:RabbitMQ consumer in GoGo 中的 RabbitMQ 消费者
【发布时间】:2016-07-25 00:26:17
【问题描述】:

我正在尝试用 Go 编写一个 RabbitMQ 消费者。假设一次从队列中取出 5 个对象并处理它们。此外,假设确认是否成功处理,否则发送到死信队列5次然后丢弃,它应该无限运行并处理消费者的取消事件。 我有几个问题:

  1. RabbitMq-go Reference 中是否有 BasicConsumerEventingBasicConsumer 的概念?
  2. RabbitMQ 中的Model 是什么,RabbitMq-go 中是否存在?
  3. 如何在死信队列失败时发送对象并在ttl 之后再次重新排队它们
  4. 下面代码中ch.Consume函数中consumerTag参数的意义是什么
  5. 对于这种情况,我们应该使用channel.Get() 还是channel.Consume()

为了满足上述要求,我需要在以下代码中进行哪些更改。我问这个是因为我找不到 RabbitMq-Go 的像样文档。

   func main() {

        consumer()        
    }

    func consumer() {

        objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}      
        initializeConn(&objConsumerConn.conn)


        ch, err := objConsumerConn.conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        msgs, err := ch.Consume(
                objConsumerConn.queueName, // queue
                "demo1",     // consumerTag
                false,   // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
            for d := range msgs {                  
                k := new(EventCaptureData)
                b := bytes.Buffer{}
                b.Write(d.Body)
                dec := gob.NewDecoder(&b)  
                err := dec.Decode(&k)
                d.Ack(true)  

                if err != nil { fmt.Println("failed to fetch the data from consumer", err); }
                    fmt.Println(k)                        
            }
        }()      

        log.Printf(" Waiting for Messages to process. To exit press CTRL+C ")
        <-forever

    }

已编辑的问题:

我已按照链接link1link2 中的建议延迟处理消息。但问题是,即使在 ttl 之后,消息也会从死信队列返回到其原始队列。我正在使用RabbitMQ 3.0.0。谁能指出是什么问题?

【问题讨论】:

标签: go rabbitmq rabbitmq-exchange rabbitmqctl


【解决方案1】:

中是否有 BasicConsumer 与 EventingBasicConsumer 的概念 RabbitMq-go 参考?

不完全是,但Channel.GetChannel.Consume 调用服务于类似的概念。使用Channel.Get,您有一个非阻塞调用,如果有任何可用消息,您将获得第一条消息,或者返回ok=false。使用Channel.Consume,排队的消息被传递到一个频道。

什么是 RabbitMQ 中的模型,它在 RabbitMq-go 中有吗?

如果您指的是 C# RabbitMQ 中的 IModelConnection.CreateModel,那是来自 C# 库的东西,而不是来自 RabbitMQ 本身。这只是尝试从 RabbitMQ“通道”术语中抽象出来,但它从未流行起来。

如何在死信队列失败时再次发送对象 ttl 后重新排队

delivery.Nack 方法与requeue=false 一起使用。

ch.Consume 中的 consumerTag 参数有什么意义 下面代码中的函数

ConsumerTag 只是一个消费者标识符。可用于取消channel.Cancel 的通道,并识别负责交付的消费者。使用channel.Consume 传递的所有消息都将设置ConsumerTag 字段。

对于这种情况,我们应该使用channel.Get() 还是channel.Consume()

我认为channel.Get() 几乎永远不会优于channel.Consume()。使用channel.Get,您将轮询队列并无所事事地浪费 CPU,这在 Go 中没有意义。

我需要在以下代码中进行哪些更改才能满足上述要求 要求。

  1. 由于您一次批处理 5 个,因此您可以有一个从消费者通道接收的 goroutine,一旦它收到 5 个交付,您就调用另一个函数来处理它们。

  2. 要确认或发送到死信队列,您将使用delivery.Ackdelivery.Nack 函数。您可以使用multiple=true 并为批处理调用一次。消息进入死信队列后,您必须检查 delivery.Headers["x-death"] 标头以了解其死信次数,并在已重试 5 次时调用 delivery.Reject

  3. 使用channel.NotifyCancel处理取消事件。

【讨论】:

  • 非常感谢您的详细解释。
  • @Pedro..我有一个问题。如果我使用 d.Ack(true) 或 d.Ack(false) 它不会在死信队列中发布消息。在 d.Nack(true, false) 的情况下,它会发布。但是在 ttl 之后,它会从那里删除消息。那么,实现相同的价值是什么
  • @Naresh 这是一个新的 RabbitMQ 问题,与此处的 Go 问题无关。您应该用它创建一个新问题。
  • 我创建了新问题。你能告诉我为什么它会表现出这种行为http://stackoverflow.com/questions/36503804/dead-letterred-messages-not-getting-requeue-to-original-queue-after-ttl
猜你喜欢
  • 1970-01-01
  • 2011-04-15
  • 1970-01-01
  • 2020-09-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-04-20
  • 2014-03-19
相关资源
最近更新 更多