【问题标题】:RabbitMQ separate listeners by typeRabbitMQ 按类型分离监听器
【发布时间】:2017-11-13 19:46:16
【问题描述】:

我有 POJO,它表示发送给 Rabbit MQ 的消息。有一个整数负责消息的类型(无论是updateremoveadd 等等):

public class Message {
    private String field1;
    private String field2;

    private Integer type;
    ...
    <some other fields>
}

我有一个消费者在我的 Spring Boot 应用程序中接受此类消息。所以为了分别处理每种类型,我必须在我的代码中添加一些开关/案例结构。

对于这种情况有没有更明确的解决方案?

【问题讨论】:

    标签: spring spring-boot rabbitmq amqp spring-amqp


    【解决方案1】:

    您可以将 Spring Integration 与路由器一起使用...

    Rabbit Inbound channel adapter -> router -> 
    

    路由器根据类型路由到不同的服务激活器(方法)。

    编辑

    这是一个例子:

    @SpringBootApplication
    public class So47272336Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So47272336Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate rabbitTemplate) {
            return args -> {
                rabbitTemplate.convertAndSend("my.queue", new Domain(1, "foo"));
                rabbitTemplate.convertAndSend("my.queue", new Domain(2, "bar"));
                rabbitTemplate.convertAndSend("my.queue", new Domain(3, "baz"));
            };
        }
    
        @Bean
        public Queue queue() {
            return new Queue("my.queue");
        }
    
        @Bean
        public IntegrationFlow flow(ConnectionFactory connectionFactory) {
            return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "my.queue"))
                    .route("payload.type", r -> r
                            .subFlowMapping("1", f -> f.handle("bean", "add"))
                            .subFlowMapping("2", f -> f.handle("bean", "remove"))
                            .subFlowMapping("3", f -> f.handle("bean", "update")))
                    .get();
        }
    
        @Bean
        public MyBean bean() {
            return new MyBean();
        }
    
        public static class MyBean {
    
            public void add(Domain object) {
                System.out.println("Adding " + object);
            }
    
            public void remove(Domain object) {
                System.out.println("Removing " + object);
            }
    
            public void update(Domain object) {
                System.out.println("Updating " + object);
            }
    
        }
    
        public static class Domain implements Serializable {
    
            private final Integer type;
    
            private final String info;
    
            public Domain(Integer type, String info) {
                this.type = type;
                this.info = info;
            }
    
            public Integer getType() {
                return this.type;
            }
    
            public String getInfo() {
                return this.info;
            }
    
            @Override
            public String toString() {
                return "Domain [type=" + this.type + ", info=" + this.info + "]";
            }
    
        }
    
    }
    

    【讨论】:

    • 我加了一个例子。
    • 是否有任何选项可以只使用没有弹簧集成的 RabbitMQ?
    • 否;没有内置任何东西;该框架对您的域对象一无所知。您可以在应用程序 bean 周围使用包装器并将开关放入包装器中;至少这会将方法路由逻辑与您的业务逻辑分开。
    猜你喜欢
    • 1970-01-01
    • 2018-03-27
    • 1970-01-01
    • 2019-08-05
    • 2018-10-25
    • 1970-01-01
    • 1970-01-01
    • 2021-01-12
    • 1970-01-01
    相关资源
    最近更新 更多