【问题标题】:Grails rabbitmq native filter messages to consumerGrails rabbitmq 原生过滤消息给消费者
【发布时间】:2016-12-20 16:11:17
【问题描述】:

我正在使用grails 3.2.3 版本和rabbitmq native plugin 3.3.2 (http://budjb.github.io/grails-rabbitmq-native/doc/manual/)。我正在尝试实现以下场景。
描述:我将多条消息发送到带有标头的单个队列,在消费者部分,我尝试通过特定过滤应用绑定来消费消息。但是无论过滤如何,消费者都会消费所有消息 - 意味着绑定不起作用。我也是rabbitmq的初学者。因此,非常感谢任何帮助/指导。下面是我的代码。

application.groovy 中的队列配置:

rabbitmq {
    queues = [
        [
                name      : "mail.queue",
                connection: "defaultConnection",
                durable   : true
        ]
]

}

发送到队列功能:

protected void sendToQueue(QueueType queueType, Map message, Map<String, String> binding = null) {
    rabbitMessagePublisher.send {
        routingKey = queueType.queueName
        body = message
        autoConvert = true
        if (headers != null) {
            headers = binding
        }
    }
}

sendToQueue 上,我将第三个参数设为可选,因为在某些情况下我不需要多种类型的消费者;

调用发送到队列:

sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET.name()])
sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()])

消费者 1:

static rabbitConfig = [
        queue   : QueueType.EMAIL_QUEUE.queueName,
        binding : ["emailType": EmailType.PASSWORD_RESET.name()],
        match   : "all",
        consumer: 10
]

def handleMessage(Map message, MessageContext context) {
    print("From PasswordResetEmailConsumer consumer")
    println(message)
    passwordResetEmailService.sendPasswordResetMail(message)
}

消费者 2:

static rabbitConfig = [
        queue   : QueueType.EMAIL_QUEUE.queueName,
        binding : ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()],
        match   : "all",
        consumer: 10
]

def handleMessage(Map message, MessageContext context) {
    print("From PasswordResetSuccessEmailConsumer consumer")
    println(message)
    passwordResetSuccessEmailService.sendPasswordResetSuccessMail(message)
}

【问题讨论】:

    标签: grails groovy rabbitmq grails-plugin rabbitmq-exchange


    【解决方案1】:

    阅读 rabbitmq 文档后,我意识到不可能有选择地从单个队列中提取消息。

    消费者从队列中接收所有消息

    虽然还有另一个选项"Exchange",发布者将发布消息以与路由密钥进行交换,并且这些消息将被传递到绑定队列。更多:RabbitMQ Publish/Subscribe Model
    这里也描述了基本思想:Stackoverflow: RabbitMQ selectively retrieving messages from a queue
    无论如何,在我的解决方案中,我不想要多个队列。所以我创建了一个消费者,并通过消息传递实际的处理程序类 bean 引用来调度消息。分享实现,希望对某人有所帮助:

    application.groovy 中的队列配置:

    rabbitmq {
        queues = [
            [
                    name      : "mail.queue",
                    connection: "defaultConnection",
                    durable   : true
            ]
        ]
    }
    

    发送到队列功能:

    protected void sendToQueue(Map message, QueueType queueType, Class<BaseQueueHandler> queueHandlerServiceClass) {
        message.queueHandlerServiceClass = queueHandlerServiceClass.name
        rabbitMessagePublisher.send {
            routingKey = queueType.queueName // queue name from enum: "mail.queue"
            body = message
            autoConvert = true
        }
    }
    

    处理程序接口:

    interface BaseQueueHandler {
        void handleMessage(Map message, MessageContext context)
    }
    

    发送到队列:

    sendToQueue([user: user], QueueType.EMAIL_QUEUE, PasswordResetEmailService.class)
    

    队列消费者:

    class EchoEmailQueueConsumer {
    
        static rabbitConfig = [
                queue   : QueueType.ECHO_EMAIL_QUEUE.queueName,
                consumer: 10
        ]
    
        GrailsApplication grailsApplication
    
        def handleMessage(Map message, MessageContext context) {
            String handlerClass = message.remove("queueHandlerServiceClass")
            Class<BaseQueueHandler> handlerClassType = Class.forName(handlerClass);
            BaseQueueHandler queueService = grailsApplication.mainContext.getBean(handlerClassType)
            queueService.handleMessage(message, context)
        }
    
    }
    

    终于实现了Handler接口的Handler服务:

    class PasswordResetEmailService implements BaseQueueHandler {
    
        @Override
        void handleMessage(Map message, MessageContext context) {
            println("message received in PasswordResetEmailService")
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2020-09-20
      • 2020-04-20
      • 2016-05-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-10-09
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多