上篇概述RabbitMQ和Kafka的入门, 此篇主要概述Spring could Stream集成 RabbitMQ和Kafka 开发消息微服务!

承接上篇 微服务与消息驱动(RabbitMQ,Kafka)之入门篇 

概述:Spring could Stream帮我们做了一定程度的简化,只需少量代码配置就可以实现两个框架的功能,不需要调用API

1.准备工作

    首先创建几个项目工程(自行创建),本案例默认使用 RabbitMQ

  • spring-server: Eureka服务器,端口 8761
  • spring-consumer: 消息消费者, Eureka客户端 ,端口 8080
  • spring-producer:  消息生产者,Eureka客户端 ,端口 8081

微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

2. 消息生产者:

微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-config</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-eureka</artifactId>
		</dependency>
		<!-- 
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-kafka</artifactId>
		</dependency>
		 -->
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
		</dependency>

    主要引入 spring-cloud-starter-stream-rabbit 依赖,引入后会自动帮我们项目引入spring-cloud-stream 以及spring-cloud-stream-binder.如果想使用kafka请将spring-cloud-starter-stream-kafka引入

微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

3. 消息的消费者:

    消费者项目工程所使用的依赖与生产者一致,先了解生产者与消费者在微服务使用,后面再一起整合测试

微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

 微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

 依次启动 spring-server(8761端口),spring-producer(8081),spring-consumer(8080),

 访问 http://localhost:8081/send

  打开消费者控制台,可以看到 hello world 输出,表示消费者已经从消息代理中获取到消息

  大多数情况下我们可以不必编写服务接口,甚至不必使用@input或@output注解,如下图第二个方法微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

   为了简化开发 Spring could Stream 内置了3个接口: Sink,Source和Processor, Processor接口继承于Sink与Source,在实际应用中可以考虑只使用Processor接口,最后给大家演示

Sink接口定义源码:

微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

Source接口定义源码:

微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

 Processor接口定义源码:

微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

4. 消费者组:

    前面讲述kafka框架的时候提到了消费者组的概念,在Spring could Stream中同样引入了这个概念.

当消费者组相同时,对于发送过来的消息仅由其中一个消费者实例处理,如果不同则会发送给全部的消费者实例

新建两个消费者项目,配置他们的消费者组为groupB,原来的为消费者为groupA

  • spring-second-consumer : 端口为8082,消息监听器中会输出 second consumer字符
  • spring-third-consumer : 端口为8083,消息监听器中会输出 third consumer字符

 微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

依次启动server,消息生产者,消息消费者(groupA和groupB),启动后访问生产者的发送地址 http://localhost:8081/send

在多吃发送后可以看到,每一次发送的消息原来的消费者都会接收到,后面加的两个消费者则会轮询处理

微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

 

上面我们说过为了简化代码只需要使用Processor接口,下面我们来看一下具体代码

微服务与消息驱动(RabbitMQ,Kafka)之开发消息微服务

这里编写的是消费者,与生产者一致需在启动类中绑定消息通道,不需要编写消息通道接口只需要在启动类中加上 Processor.class,在接收或者发送消息时在注解@StreamListener中指定输入消息(Processor.INPUT)还是发送消息(Processor.OUTPUT). 生产者同理可进行重构在此不在重复啰嗦了

后续再更新其它内容,敬请关注

相关文章: