目录
因为stream是一个消息驱动微服务框架,借助binder插件与第三方消息组件进行信息交互的,因此使用大致过程与stream+rocketmq是一样的,不同的是在配置信息上可能有所不同,因为rocketmq是阿里巴巴提供维护的,并不是stream官方维护的。
(1)引入依赖:
A、引入Spring Cloud管理依赖:
B、引入Spring Cloud Alibaba管理依赖:
C、引入Spring Cloud Stream管理依赖:
D、引入kafka依赖:
(2)修改配置文件:
A、生产者配置文件:
B、消费者配置文件:
消费者配置rabbitmq的信息与生产者配置rabbitmq的信息几乎是完全一样的,只有bindings中对应的名称不一样而已,并且增加了一个分组group,保证集群部署,保证一条消息只让一个分组内的一个实例消费。
(3)创建接口:
A、生产者接口:
创建一个自定义生产者的binding对应的接口,在接口中写一个out()方法,返回值为MessageChannel,并在方法上添加注解Output,指定一个自定义的binding常量值,这个常量值是与配置文件中写的那个自定义binding值保持一致。
B、消费者接口:
创建一个自定义消费者的binding对应的接口,在接口中写一个input()方法,返回值为SubscribableChannel,并在方法上添加注解input,指定一个自定义的binding常量值,这个常量值是与配置文件中写的那个自定义binding值保持一致。
(4)修改启动类:
在启动类上,绑定刚刚创建的自定义binding对应的那个接口,如果项目中有多个接口,那么就绑定多个接口。
(5)编写代码:
A、发送消息代码:
首先通过Autowired创建自定义的接口对象,然后在方法体中,通过自定义接口对象调用output()方法发送消息。
B、消费消息代码:
创建一个实体类,类上添加service注解,在类中创建一个方法,方法有一个string类型的参数(该参数就是消息体),方法上添加StreamListener,并指定是哪个input。
(6)启动测试:
启动消息生产者、消息消费者,访问接口,然后就会显示发送消息成功,在消费者中也会消费刚才发送的消息。在kafka-manager的控制台中可以看到在topic的list列表中会看到对应的topic,并且kafka是将消息数据持久化到日志中的,因此在安装kafka的时候注意配置日志路径,就可以完成消息持久化了。这样在消费者异常期间,有生产者发送的消息也就持久化kafka的日志中了,即便是kafka关闭再启动后,消费者也可以消费在异常期间生产者发送的消息,这就表明stream+kafka集成成功了,并且实现了消息的持久化,可以正常发送和消费消息了。