【发布时间】:2016-07-25 00:26:17
【问题描述】:
我正在尝试用 Go 编写一个 RabbitMQ 消费者。假设一次从队列中取出 5 个对象并处理它们。此外,假设确认是否成功处理,否则发送到死信队列5次然后丢弃,它应该无限运行并处理消费者的取消事件。 我有几个问题:
- RabbitMq-go Reference 中是否有
BasicConsumer与EventingBasicConsumer的概念? - RabbitMQ 中的
Model是什么,RabbitMq-go 中是否存在? - 如何在死信队列失败时发送对象并在
ttl之后再次重新排队它们 - 下面代码中
ch.Consume函数中consumerTag参数的意义是什么 - 对于这种情况,我们应该使用
channel.Get()还是channel.Consume()?
为了满足上述要求,我需要在以下代码中进行哪些更改。我问这个是因为我找不到 RabbitMq-Go 的像样文档。
func main() {
consumer()
}
func consumer() {
objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}
initializeConn(&objConsumerConn.conn)
ch, err := objConsumerConn.conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
msgs, err := ch.Consume(
objConsumerConn.queueName, // queue
"demo1", // consumerTag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
k := new(EventCaptureData)
b := bytes.Buffer{}
b.Write(d.Body)
dec := gob.NewDecoder(&b)
err := dec.Decode(&k)
d.Ack(true)
if err != nil { fmt.Println("failed to fetch the data from consumer", err); }
fmt.Println(k)
}
}()
log.Printf(" Waiting for Messages to process. To exit press CTRL+C ")
<-forever
}
已编辑的问题:
我已按照链接link1link2 中的建议延迟处理消息。但问题是,即使在 ttl 之后,消息也会从死信队列返回到其原始队列。我正在使用RabbitMQ 3.0.0。谁能指出是什么问题?
【问题讨论】:
-
尝试使用 amqp 包与 rabbit 交互,它有一个非常好的文档godoc.org/github.com/streadway/amqp
-
@PerroVerd 这就是我正在使用的。
标签: go rabbitmq rabbitmq-exchange rabbitmqctl