上篇概述RabbitMQ和Kafka的入门, 此篇主要概述Spring could Stream集成 RabbitMQ和Kafka 开发消息微服务!
承接上篇 微服务与消息驱动(RabbitMQ,Kafka)之入门篇
概述:Spring could Stream帮我们做了一定程度的简化,只需少量代码配置就可以实现两个框架的功能,不需要调用API
1.准备工作
首先创建几个项目工程(自行创建),本案例默认使用 RabbitMQ
- spring-server: Eureka服务器,端口 8761
- spring-consumer: 消息消费者, Eureka客户端 ,端口 8080
- spring-producer: 消息生产者,Eureka客户端 ,端口 8081
2. 消息生产者:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<!--
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
主要引入 spring-cloud-starter-stream-rabbit 依赖,引入后会自动帮我们项目引入spring-cloud-stream 以及spring-cloud-stream-binder.如果想使用kafka请将spring-cloud-starter-stream-kafka引入
3. 消息的消费者:
消费者项目工程所使用的依赖与生产者一致,先了解生产者与消费者在微服务使用,后面再一起整合测试
依次启动 spring-server(8761端口),spring-producer(8081),spring-consumer(8080),
访问 http://localhost:8081/send
打开消费者控制台,可以看到 hello world 输出,表示消费者已经从消息代理中获取到消息
大多数情况下我们可以不必编写服务接口,甚至不必使用@input或@output注解,如下图第二个方法
为了简化开发 Spring could Stream 内置了3个接口: Sink,Source和Processor, Processor接口继承于Sink与Source,在实际应用中可以考虑只使用Processor接口,最后给大家演示
Sink接口定义源码:
Source接口定义源码:
Processor接口定义源码:
4. 消费者组:
前面讲述kafka框架的时候提到了消费者组的概念,在Spring could Stream中同样引入了这个概念.
当消费者组相同时,对于发送过来的消息仅由其中一个消费者实例处理,如果不同则会发送给全部的消费者实例
新建两个消费者项目,配置他们的消费者组为groupB,原来的为消费者为groupA
- spring-second-consumer : 端口为8082,消息监听器中会输出 second consumer字符
- spring-third-consumer : 端口为8083,消息监听器中会输出 third consumer字符
依次启动server,消息生产者,消息消费者(groupA和groupB),启动后访问生产者的发送地址 http://localhost:8081/send
在多吃发送后可以看到,每一次发送的消息原来的消费者都会接收到,后面加的两个消费者则会轮询处理
上面我们说过为了简化代码只需要使用Processor接口,下面我们来看一下具体代码
这里编写的是消费者,与生产者一致需在启动类中绑定消息通道,不需要编写消息通道接口只需要在启动类中加上 Processor.class,在接收或者发送消息时在注解@StreamListener中指定输入消息(Processor.INPUT)还是发送消息(Processor.OUTPUT). 生产者同理可进行重构在此不在重复啰嗦了
后续再更新其它内容,敬请关注