说明

对Spring Boot 和 Spring Integration的整合,通过Spring Cloud Stream能够简化消息中间件使用的复杂难度!让业务人员更多的精力能够花在业务层面

consumer

1.创建一个一个项目名为spring-cloud-stream-consumer

2.引入pom依赖

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

3.yml配置

spring:
  application:
    name: streamConsumer
  rabbitmq:   #更多mq配置看书331页
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
server:
  port: 8081

5.创建消息监听类

//实现对定义了多个@Input和@Output的接口实现对通道的绑定 Sink定义了@Input 我们自己处理时是自己定义接口
@EnableBinding(Sink.class)
public class SkinReceiverService {
    private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
    //对input的消息监听处理
    @StreamListener(Sink.INPUT)
    public void receiver(Object message){
        logger.info(message.toString());
    }
}

6.启动rabbitmq

7.启动项目

8.通过mq管理页面发送消息

Spring Cloud Stream(十三)

 

Spring Cloud Stream(十三)

 

 控制台打印 表示成功接收到消息

Spring Cloud Stream(十三)

核心概念

绑定器

应用程序与消息中间件的抽象层。应用程序中间件的解耦。应用程序不需要考虑用的什么类型的消息中间件。当我们需要更换消息中间件 只需要替换绑定器

发布订阅

spring cloud stream 完全遵循发布订阅模式 当一条消息被发布到消息中间件后 将会以topic主题模式进行广播,消费者对订阅的topic主题进行相应的逻辑处理。topic是spring cloud stream的一个抽象概念,不同消息中间件topic概念可能不同 rabbitMq对应exchage

如rabbitMQ的topic

Spring Cloud Stream(十三)

 

 发布订阅模式能够有效避免点对点的耦合 当一种消息要增加一种处理方式时只需要增加一个消息订阅者

消费组

一般我们的消费组都会集群部署 但是我们再集群部署的情况下 会形成多个订阅者 导致消息被消费多次, 消费组则是解决一个消息只能被一个实例消费者消费

消费分区

指定统一特征的消息被指定服务实例消费 spring cloud stream消费分区提供通用的抽象实现 使不支持分区的中间件也能支持消费分区

自定义输入和输出

定义输入

Sink 是spring cloud stream 的默认实现 我们可以通过查看源码

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

通过@Input注册参数为通道名字 同时需要返回SubscribableChannel

我们通过参考Sink定义一个输入通道 比如处理订单保存的通道

1.定义第一个通道

public interface OrderMQInputChannel {
    String saveOrderChannelName="saveOrder";//定义通道的名字
    @Input(saveOrderChannelName)//定义为输入通道
    public SubscribableChannel saveOrder();
}

2.绑定通道并监听

//通过绑定器 对OrderMQInputChannel通道进行绑定
@EnableBinding(OrderMQInputChannel.class)
public class OrderMQReceiverService {
    private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
    //对OrderMQInputChannel.saveOrderChannelName的消息监听处理
    @StreamListener(OrderMQInputChannel.saveOrderChannelName)
    public void receiver(Object message){
        logger.info(message.toString());
    }
}

定义输出

1.创建一个测试消费提供者的项目

 Spring Cloud Stream(十三)

2.引入pom依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>

2.yml配置文件配置

spring:
  application:
    name: streamProvider
  rabbitmq:   #更多mq配置看书331页
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
server:
  port: 8082

3.定义一个输出通道

public interface OrderMQOutputChannel {
    String saveOrderChannelName="saveOrder";
    @Output(saveOrderChannelName)//定义输出管道的名字
    MessageChannel saveOrder();
}

4.绑定通道

@EnableBinding(OrderMQOutputChannel.class) //绑定通道OrderMQOutputChannel
public class OrderChannelBindConfig {
}

5.添加测试contorller

@Controller
public class TestContorller {
    @Autowired
    OrderMQOutputChannel orderMQOutputChannel;
    @RequestMapping("/saveOrder")
    @ResponseBody
    public boolean saveOrder(){
        //发送一条保存订单的命令
       return  orderMQOutputChannel.saveOrder().send(MessageBuilder.withPayload("fff").build());
    }
}

或者

@Controller
public class TestContorller {

    //直接注入对应通道是的实例
    @Autowired@Qualifier(OrderMQOutputChannel.saveOrderChannelName)
    MessageChannel messageChannel;

    @RequestMapping("/saveOrder")
    @ResponseBody
    public boolean saveOrder(){
        //发送一条保存订单的命令
        return  messageChannel.send(MessageBuilder.withPayload("fff").build());
    }

}

 

 

6.访问

http://127.0.0.1:8082/saveOrder

Spring Cloud Stream(十三)

7.consumer打印 表示消息被消费

Spring Cloud Stream(十三)

 spring intergration原生支持

spring cloud stream 是通过spring boot和spring intergreation的整合 所以也可以使用原生的用法实现相同的功能

provider

@EnableBinding(OrderMQOutputChannel.class) //绑定通道OrderMQOutputChannel
public class OriginalOrderMQOutPutChannelService {
    //定义2秒发送一次消息
    @Bean
    @InboundChannelAdapter(value = OrderMQOutputChannel.saveOrderChannelName, poller = @Poller(fixedDelay = "2000"))
    public MessageSource<Date> timerMessageSource() {
        return () -> new GenericMessage<>(new Date());

    }
}

consumer

//通过绑定器 对OrderMQInputChannel通道进行绑定
@EnableBinding(OrderMQInputChannel.class)
public class OriginalOrderMQReceiverService {
    private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);

    //对OrderMQInputChannel.saveOrderChannelName的消息监听处理
    @ServiceActivator(inputChannel = OrderMQInputChannel.saveOrderChannelName)
    public void receiver(Object message) {
        logger.info(message.toString());
    }

    //定义消息转换器 转换saveOrderChannelName 通道的的消息
    @Transformer(inputChannel = OrderMQInputChannel.saveOrderChannelName, outputChannel = OrderMQInputChannel.saveOrderChannelName)
    public Object transform(Date message) {
        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(message);

    }
}

启动之后conusmer2秒则会受到一条消息 更多用法查看spring intergration文档

消息转换 

通过上面我们可以看到原生通过@Transformer实现消息转换 spring cloud stream 只需要定义消息通道的消息类型

spring.cloud.stream.bindings.[inputname].content-type=application/json
spring:
  application:
    name: streamConsumer
  rabbitmq:   #更多mq配置看书331页
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
  cloud:
    stream:
      bindings:
        saveOrder:
          content-type: application/json
server:
  port: 8081

 

消费者

//通过绑定器 对OrderMQInputChannel通道进行绑定
@EnableBinding(OrderMQInputChannel.class)
public class OrderMQReceiverService {
    private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
    //对OrderMQInputChannel.saveOrderChannelName的消息监听处理
    @StreamListener(OrderMQInputChannel.saveOrderChannelName)
    public void receiver(Order order){
       
        logger.info(order.getId()+"-"+order.getOrderCode());
    }
}

消息提供者

@Controller
public class TestContorller {

    //直接注入对应通道是的实例
    @Autowired@Qualifier(OrderMQOutputChannel.saveOrderChannelName)
    MessageChannel messageChannel;

    @RequestMapping("/saveOrder")
    @ResponseBody
    public boolean saveOrder(){
       com.liqiang.entity.Order order=new  com.liqiang.entity.Order();
       order.setId(1L);
       order.setOrderCode("201901020001");
        //发送一条保存订单的命令
        return  messageChannel.send(MessageBuilder.withPayload(order).build());
    }

}

 

Spring Cloud Stream(十三)

 

消息反馈

用于将消息交给别的应用处理 处理后再回传  或者异步请求 接收处理结果

provider

public interface OrderMQOutputChannel {
    String saveOrderChannelName="saveOrder";
    String saveOrderCallbackChannelName="saveOrderCallback";//定义回调通道的名字
    @Output(saveOrderChannelName)//定义输出管道的名字
    MessageChannel saveOrder();

    @Input(saveOrderCallbackChannelName)//定义为输入通道
    public SubscribableChannel saveOrderCallback();
}
@EnableBinding(OrderMQOutputChannel.class) //绑定通道OrderMQOutputChannel
public class OrderChannelBindConfig {
    private static Logger logger = LoggerFactory.getLogger(OrderMQOutputChannel.class);
    //对OrderMQInputChannel.saveOrderChannelName的消息监听处理
    @StreamListener(OrderMQOutputChannel.saveOrderCallbackChannelName)
    public void receiver(boolean boo){
        logger.info(String.valueOf(boo));
    }
}

consumer

public interface OrderMQInputChannel {
    String saveOrderChannelName="saveOrder";//定义通道的名字
    String saveOrderCallbackChannelName="saveOrderCallback";//定义回调通道的名字
    @Input(saveOrderChannelName)//定义为输入通道
    public SubscribableChannel saveOrder();
}
//通过绑定器 对OrderMQInputChannel通道进行绑定
@EnableBinding(OrderMQInputChannel.class)
public class OrderMQReceiverService {
    private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
    //对OrderMQInputChannel.saveOrderChannelName的消息监听处理
    @StreamListener(OrderMQInputChannel.saveOrderChannelName)
    @SendTo(OrderMQInputChannel.saveOrderCallbackChannelName)//反馈的通道名字
    public boolean receiver(Order order){

        logger.info(order.getId()+"-"+order.getOrderCode());
        return true;
    }
}

消息分组

多实例情况下 只需要指定spring.cloud.stream.bindings.[channelname].group=gorupname 当同一组实例对同一个主题的消息只能会有一个实例消费

1.测试 创建2个配置文件 分别为

application-peer1.yml

spring:
  application:
    name: streamConsumer
  rabbitmq:   #更多mq配置看书331页
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
  cloud:
    stream:
      bindings:
        saveOrder:
          group: groupA
          content-type: application/json
server:
  port: 8081

application-peer2.yml

spring:
  application:
    name: streamConsumer
  rabbitmq:   #更多mq配置看书331页
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
  cloud:
    stream:
      bindings:
        saveOrder:
          group: groupA
          content-type: application/json
server:
  port: 8083

通过启动2个消费者

java -jar /Users/liqiang/Desktop/java开发环境/javadom/spring-cloud-parent/spring-cloud-stream-consumer/target/spring-cloud-stream-consumer-0.0.1-SNAPSHOT.jar --spring.profiles.active=peer1

java -jar /Users/liqiang/Desktop/java开发环境/javadom/spring-cloud-parent/spring-cloud-stream-consumer/target/spring-cloud-stream-consumer-0.0.1-SNAPSHOT.jar --spring.profiles.active=peer2

Spring Cloud Stream(十三)

只要一个实例消费了 

消费分区

再某些场景 我需要指定某一类消息只能被哪些实例消费

消费者

application-peer1.yml

spring:
  application:
    name: streamConsumer
  rabbitmq:   #更多mq配置看书331页
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
  cloud:
    stream:
      instanceCount: 2 #跟分区一起使用 有多少实例
      instanceIndex: 0 #分区当前实例编号 从0开始
      bindings:
        saveOrder:
          group: streamConsumer
          content-type: application/json
          consumer:
            partitioned: true #开启消息分区的功能

server:
  port: 8081

application-peer2.yml

spring:
  application:
    name: streamConsumer
  rabbitmq:   #更多mq配置看书331页
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
  cloud:
    stream:
      instanceCount: 2 #跟分区一起使用 有多少实例
      instanceIndex: 1 #当前实例编号 从0开始
      bindings:
        saveOrder:
          group: streamConsumer
          content-type: application/json
          consumer:
            partitioned: true #开启消息分区的功能

server:
  port: 8083

生产者

spring:
  application:
    name: streamProvider
  rabbitmq:   #更多mq配置看书331页
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
  cloud:
    stream:
      bindings:
        saveOrder:
          producer:
            partitionKeyExpression: '0' #表示只有实例索引为0的才能收到消息 支持SpEL表达式
            partitionCount: 2
server:
  port: 8082

当启动2个消费者 和生产者 当前生产者 生产的消息只能被实例编号为0的消费

 

这里限制死了当前实例生产的消息被某个实例消费。如果我们需要指定 当前生产者生产的某一类服务被指定实例消费呢可以通过SpEL表达式设置

生产者yml

spring:
  application:
    name: streamProvider
  rabbitmq:   #更多mq配置看书331页
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
  cloud:
    stream:
      bindings:
        saveOrder:
          producer:
            partitionKeyExpression: headers['partitionKey'] #SpEL表达式 通过读取消息hearder的partitionKey属性动态指定
            partitionCount: 2 #消息分区数量
server:
  port: 8082

消息生产通过header动态指定

package com.liqiang.springcloudstreamprovider;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.annotation.Order;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class TestContorller {

    //直接注入对应通道是的实例
    @Autowired@Qualifier(OrderMQOutputChannel.saveOrderChannelName)
    MessageChannel messageChannel;
    private static  int index=0;

    @RequestMapping("/saveOrder")
    @ResponseBody
    public boolean saveOrder(){
       com.liqiang.entity.Order order=new  com.liqiang.entity.Order();
       order.setId(1L);
       order.setOrderCode("201901020001");

        //发送一条保存订单的命令
        return  messageChannel.send(MessageBuilder.withPayload(order).setHeader("partitionKey",(index++)%2==0?0:1).build());
    }

}

 

相关文章:

  • 2022-12-23
  • 2022-12-23
猜你喜欢
  • 2022-01-08
  • 2021-10-16
  • 2021-11-28
相关资源
相似解决方案