Spring Cloud Stream

Spring Cloud Stream 是什么

  • ⼀款用于构建消息驱动的微服务应用程序的轻量级框架

特性

  • 声明式编程模型,通过注解配置来声明我的这个系统要收发声明消息,这些消息有什么样的特性
  • 引入多种概念抽象,不同的消息中间件提供的接口、特性是不一样的,Spring Cloud Stream 给它做了一层统一的封装抽象
    • 发布订阅、消费组、分区
  • 支持多种消息中间件
    • RabbitMQ、Kafka ……

Spring Cloud Stream 的⼀些核心概念

Binder
在 Spring Cloud Stream 应用程序跟mq之间的抽象,它把不同的消息中间件统一的封装成binder的类型。有了这些binder之后,应用程序只需要跟binder打交道,并不需要去关注底层的细节。如果需要使用到消息中间件特有的特性,Spring Cloud Stream 给我们提供了一些配置,让我们去做一个定制。如果不需要特殊化的配置,直接设置成一样的就行了。

  • RabbitMQ

  • Apache Kafka

  • Kafka Streams

  • Amazon Kinesis

  • RocketMQ

  • ……
    认识 Spring Cloud Stream

Binding

  • 应用中生产者、消费者与消息系统之间的桥梁 通过注解的方式去定义一个接口
    • @EnableBinding

• @Input / SubscribableChannel
在这个接口里面,我的要订阅的消息,用@Input 注解去定义一个方法,返回值是SubscribableChannel
• @Output / MessageChannel
在这个接口里面,我的要发送的消息,用@Output 注解去定义一个方法,返回值是MessageChannel

有了这样一个接口在上面添加注解之后,我们使用@EnableBinding这个注解,告诉我的应用程序,去EnableBinding哪些接口,Spring Cloud Stream 会把这些接口变成对应的bean,注入进来。使用注入的bean,进行消息的发送和接收。

消费组

  • 对同⼀消息,每个组中都会有⼀个消费者收到消息
    比如说:我有一个a系统,它发送一个消息到mq上面,然后,有bcd三个系统,都希望收到这个消息,但是,我希望bcd这个系统每个系统是个集群,集群中只有一台消费者收到这个消息,这个时候,就可以做出三个消费组,一条消息上来之后,每个消费组都会有一个消费者收到这个消息。这个时候,不同的系统都可以收到这个消息,而且仅处理一次。消息重复之类的问题,需要我们自己处理。

分区
认识 Spring Cloud Stream
不同消息的生产者,向不同分区投递消息,在大的一个环境下,可以看成消息是无序的。在partition中,消息可以看成有序的,而且partition每次投个一个相同的消费者消费。需要注意的是,在消息的消费者里要考虑,万一消息有乱序的情况发生,我们要去如何处理。

如何发送与接收消息

在添加input和output注解的接口,Spring Cloud Stream 会把它变成bean注入进来,注入进来之后。
生产消息

  • 使用MessageChannel 中的 send() 发生一个消息
  • 也可以在方法上加入@SendTo注解 这个方法的返回值就会发送到特定的消息队列里面去

消费消息

  • 在一个方法上面添加 @StreamListener 注解在里面去指定 要去消费那个消息队列
    • @Payload / @Headers / @Header
    Payload :消费的消息的消息体
    Headers :消息的头信息
    Header:消息的头信息
    其他说明

  • 可以使用 Spring Integration

相关文章: