项目路径:https://github.com/zhaopeng01/springboot-study/tree/master/study_15

序言

用来在服务和服务之间进行异步通信的一种技术,采用TCP通信协议,
为了进一步提高网站性能,提高网站并发能力,提高网站可用性,可以使用mq消息中间件进行流量削峰,异步通信,任务的异步处理,服务解耦合等

安装

去官网下载tar.gz包,放到自己喜欢的地方去
地址:下载5.15.9
SpringBoot(15) - - SpringBoot整合ActiveMq(整合)
然后进入路径: apache-activemq-5.15.9/bin/macosx
我的路径是: /Users/zhaopeng/develop/mq/apache-activemq-5.15.9/bin/macosx

启动并访问ActiveMq

通过命令启动: ./activemq start
在浏览器中输入URL: http://localhost:8161/
点击 Manager ActiveMQ boker 输入用户名:admin 密码admin
然后就可以看到下图,就代表启动ok了
SpringBoot(15) - - SpringBoot整合ActiveMq(整合)

解释下上面图片中控制台这些按钮的基本信息:

    Home:查看 ActiveMQ 的常见信息
    Queues:查看 ActiveMQ 的队列信息
    Topics:查看 ActiveMQ 的主题信息
    Subscribers:查看主题的订阅者信息
    Connections:查看 ActiveMQ 客户端的连接信息
    Network:查看 ActiveMQ 的网络信息
    Scheduled:查看 ActiveMQ 的定时任务
    Send:用于通过表单方式向队列或者主题发送具体的消息

正文

依赖

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

点对点发送消息

这个队列是不需要我们提前定义好的,它会在我们需要的时候动态的创建

package com.zyc.active;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

/**
 * @description: 点对点发送消息
 *
 * @author zhaopeng
 * @date 2019/4/9
 */
@Component
public class O2OSend {

    @Autowired
    private JmsTemplate jmsTemplate;

    public void send(String message) {
        jmsTemplate.convertAndSend("zyc", message);
    }
}

点对点接收消息

package com.zyc.active;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * @description: 点对点接收消息
 *
 * @author zhaopeng
 * @date 2019/4/9
 */
@Component
public class O2OReceive {
    @JmsListener(destination = "zyc")
    public void receive(String message) {
        System.out.println("收到的 msg 是:" + message);
    }
}

启动类

package com.zyc;

import com.zyc.active.O2OSend;
import com.zyc.active.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.util.StopWatch;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@SpringBootApplication
@RestController
public class Study15Application {

    @Autowired
    O2OSend send;
    @Autowired
    Publisher publisher;

    @RequestMapping("/sendQueue")
    public void sendQueue() {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        for (int i = 0; i < 10; i++) {
            send.send("发送消息----zyc-----" + i);
        }
        stopWatch.stop();
        System.out.println("发送消息耗时: " + stopWatch.getTotalTimeMillis());
    }

    public static void main(String[] args) {
        SpringApplication.run(Study15Application.class, args);
    }

}

结果

发送一万条消息运行后需要的时间挺久的:10955 ms
可见性能寡气,SpringBoot(15) - - SpringBoot整合ActiveMq(整合)

发布/订阅模式

发布消息

package com.zyc.active;

import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.Destination;

/**
 * @author zhaopeng
 * @description: 发布话题
 * @date 2019/4/9
 */
@Service
public class Publisher {

    @Resource
    private JmsMessagingTemplate jmsMessagingTemplate;

    public void publish(String destinationName, String message) {
        Destination destination = new ActiveMQTopic(destinationName);
        System.out.println("============>>>>> 发布topic消息 " + message);
        jmsMessagingTemplate.convertAndSend(destination, message);
    }
}

订阅消息

package com.zyc.active;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;

/**
 * @description: 订阅话题
 *
 * @author zhaopeng
 * @date 2019/4/9
 */
@Service
public class Subscriber {

    @JmsListener(destination = "zycTopic", containerFactory = "myJmsContainerFactory")
    public void subscribe(String text) {
        System.out.println("-----> 收到订阅msg为:" + text);
    }
}

订阅配置

在发布/订阅模式中,对消息的监听需要对containerFactory进行以下配置

package com.zyc.active;

import javax.jms.ConnectionFactory;
import javax.jms.Topic;

import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;

/**
 * @author zhaopeng
 * @description: ActiveMq 配置文件
 * @date 2019/4/9
 */
@Configuration
@EnableJms
public class ActiveMQConfiguration {
    /**
     * 发布订阅
     */
    @Bean
    JmsListenerContainerFactory<?> myJmsContainerFactory(ConnectionFactory connectionFactory) {
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }

}

启动类

package com.zyc;

import com.zyc.active.O2OSend;
import com.zyc.active.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.util.StopWatch;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@SpringBootApplication
@RestController
public class Study15Application {

    @Autowired
    O2OSend send;
    @Autowired
    Publisher publisher;

    @RequestMapping("/sendQueue")
    public void sendQueue() {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        for (int i = 0; i < 10; i++) {
            send.send("发送消息----zyc-----" + i);
        }
        stopWatch.stop();
        System.out.println("发送消息耗时: " + stopWatch.getTotalTimeMillis());
    }

    @RequestMapping("/sendTopic")
    public void sendTopic() {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        for (int i = 0; i < 10; i++) {
            publisher.publish("zycTopic", "Topic Message " + i);
        }
        stopWatch.stop();
        System.out.println("发送消息耗时: " + stopWatch.getTotalTimeMillis());
    }

    public static void main(String[] args) {
        SpringApplication.run(Study15Application.class, args);
    }

}

结果

SpringBoot(15) - - SpringBoot整合ActiveMq(整合)

模拟应用

按照以上点对点,发布/订阅模式,在SpringBoot中很容易就实现类两种模式的消息发送和接收。但是jms具体的应用场景是在不同的应用程序之间,生产者和消费者往往是在不同的应用中的。此外,以上例子中的消息我们只发送字符串,其实还可以发送Object类型的消息,甚至可以使用messageCreator自定义消息的转换,而不使用convertAndSend方法默认转换。

多个应用程序之间发送消息

1.先使用一个只有发送者,没有消费者或订阅者的应用发送两类消息各十条
SpringBoot(15) - - SpringBoot整合ActiveMq(整合)

然后打开http://localhost:8161可以看到
两类都增有十条消息入列,但只有queues中还存留10条消息。

SpringBoot(15) - - SpringBoot整合ActiveMq(整合)
SpringBoot(15) - - SpringBoot整合ActiveMq(整合)

现在我们启动包含消费者和订阅者的应用程序,然后可以看到果然,只有点对点消费者收到了queues中的消息。
这说明订阅者接收topic是需要在topic发布之前订阅;而生产/消费模式下,消息发出后会存放在队列中,等待消费者消费

SpringBoot(15) - - SpringBoot整合ActiveMq(整合)

2.我们启动两个包含订阅者和消费者的程序,再发布消息
SpringBoot(15) - - SpringBoot整合ActiveMq(整合)
SpringBoot(15) - - SpringBoot整合ActiveMq(整合)

两个订阅者都收到 消息0~9, 而消费者中,一个收到消息 1、3、5、7、9,另一个收到0、2、4、6、8。
这说明有多个消息接收者时,生产/消费模式下多个消费者会轮流消费队列中的消息,而pub/sub模式下所有订阅者都会得到所有的消息。
以上就是在多个应用程序之间对发布/订阅模式和生产/消费模式的不同特点进行的一些小的验证;

相关文章:

猜你喜欢
  • 2019-01-17
  • 2021-06-16
  • 2021-05-16
  • 2021-10-07
  • 2021-08-24
相关资源
相似解决方案