目录

(1)引入依赖:

A、引入Spring Cloud管理依赖:

B、引入Spring Cloud Alibaba管理依赖:

C、引入Spring Cloud Stream管理依赖:

D、引入kafka依赖:

(2)修改配置文件:

A、生产者配置文件:

B、消费者配置文件:

(3)创建接口:

A、生产者接口:

B、消费者接口:

(4)修改启动类:

(5)编写代码:

A、发送消息代码:

B、消费消息代码:

(6)启动测试:


因为stream是一个消息驱动微服务框架,借助binder插件与第三方消息组件进行信息交互的,因此使用大致过程与stream+rocketmq是一样的,不同的是在配置信息上可能有所不同,因为rocketmq是阿里巴巴提供维护的,并不是stream官方维护的。

(1)引入依赖:

A、引入Spring Cloud管理依赖:

springCloud-Alibaba——stream+kafka消息微服务集成

B、引入Spring Cloud Alibaba管理依赖:

springCloud-Alibaba——stream+kafka消息微服务集成

C、引入Spring Cloud Stream管理依赖:

springCloud-Alibaba——stream+kafka消息微服务集成

D、引入kafka依赖:

springCloud-Alibaba——stream+kafka消息微服务集成

(2)修改配置文件:

A、生产者配置文件:

springCloud-Alibaba——stream+kafka消息微服务集成

B、消费者配置文件:

消费者配置rabbitmq的信息与生产者配置rabbitmq的信息几乎是完全一样的,只有bindings中对应的名称不一样而已,并且增加了一个分组group,保证集群部署,保证一条消息只让一个分组内的一个实例消费。

springCloud-Alibaba——stream+kafka消息微服务集成

(3)创建接口:

A、生产者接口:

创建一个自定义生产者的binding对应的接口,在接口中写一个out()方法,返回值为MessageChannel,并在方法上添加注解Output,指定一个自定义的binding常量值,这个常量值是与配置文件中写的那个自定义binding值保持一致。

springCloud-Alibaba——stream+kafka消息微服务集成

B、消费者接口:

创建一个自定义消费者的binding对应的接口,在接口中写一个input()方法,返回值为SubscribableChannel,并在方法上添加注解input,指定一个自定义的binding常量值,这个常量值是与配置文件中写的那个自定义binding值保持一致。

springCloud-Alibaba——stream+kafka消息微服务集成

(4)修改启动类:

在启动类上,绑定刚刚创建的自定义binding对应的那个接口,如果项目中有多个接口,那么就绑定多个接口。

springCloud-Alibaba——stream+kafka消息微服务集成

(5)编写代码:

A、发送消息代码:

首先通过Autowired创建自定义的接口对象,然后在方法体中,通过自定义接口对象调用output()方法发送消息。

springCloud-Alibaba——stream+kafka消息微服务集成

B、消费消息代码:

创建一个实体类,类上添加service注解,在类中创建一个方法,方法有一个string类型的参数(该参数就是消息体),方法上添加StreamListener,并指定是哪个input。

springCloud-Alibaba——stream+kafka消息微服务集成

(6)启动测试:

启动消息生产者、消息消费者,访问接口,然后就会显示发送消息成功,在消费者中也会消费刚才发送的消息。在kafka-manager的控制台中可以看到在topic的list列表中会看到对应的topic,并且kafka是将消息数据持久化到日志中的,因此在安装kafka的时候注意配置日志路径,就可以完成消息持久化了。这样在消费者异常期间,有生产者发送的消息也就持久化kafka的日志中了,即便是kafka关闭再启动后,消费者也可以消费在异常期间生产者发送的消息,这就表明stream+kafka集成成功了,并且实现了消息的持久化,可以正常发送和消费消息了。

相关文章: