项目路径:https://github.com/zhaopeng01/springboot-study/tree/master/study_15
序言
用来在服务和服务之间进行异步通信的一种技术,采用TCP通信协议,
为了进一步提高网站性能,提高网站并发能力,提高网站可用性,可以使用mq消息中间件进行流量削峰,异步通信,任务的异步处理,服务解耦合等
安装
去官网下载tar.gz包,放到自己喜欢的地方去
地址:下载5.15.9
然后进入路径:apache-activemq-5.15.9/bin/macosx
我的路径是:/Users/zhaopeng/develop/mq/apache-activemq-5.15.9/bin/macosx
启动并访问ActiveMq
通过命令启动:
./activemq start
在浏览器中输入URL: http://localhost:8161/
点击Manager ActiveMQ boker输入用户名:admin密码admin
然后就可以看到下图,就代表启动ok了
解释下上面图片中控制台这些按钮的基本信息:
Home:查看 ActiveMQ 的常见信息
Queues:查看 ActiveMQ 的队列信息
Topics:查看 ActiveMQ 的主题信息
Subscribers:查看主题的订阅者信息
Connections:查看 ActiveMQ 客户端的连接信息
Network:查看 ActiveMQ 的网络信息
Scheduled:查看 ActiveMQ 的定时任务
Send:用于通过表单方式向队列或者主题发送具体的消息
正文
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
点对点发送消息
这个队列是不需要我们提前定义好的,它会在我们需要的时候
动态的创建。
package com.zyc.active;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
/**
* @description: 点对点发送消息
*
* @author zhaopeng
* @date 2019/4/9
*/
@Component
public class O2OSend {
@Autowired
private JmsTemplate jmsTemplate;
public void send(String message) {
jmsTemplate.convertAndSend("zyc", message);
}
}
点对点接收消息
package com.zyc.active;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* @description: 点对点接收消息
*
* @author zhaopeng
* @date 2019/4/9
*/
@Component
public class O2OReceive {
@JmsListener(destination = "zyc")
public void receive(String message) {
System.out.println("收到的 msg 是:" + message);
}
}
启动类
package com.zyc;
import com.zyc.active.O2OSend;
import com.zyc.active.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.util.StopWatch;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
@RestController
public class Study15Application {
@Autowired
O2OSend send;
@Autowired
Publisher publisher;
@RequestMapping("/sendQueue")
public void sendQueue() {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
for (int i = 0; i < 10; i++) {
send.send("发送消息----zyc-----" + i);
}
stopWatch.stop();
System.out.println("发送消息耗时: " + stopWatch.getTotalTimeMillis());
}
public static void main(String[] args) {
SpringApplication.run(Study15Application.class, args);
}
}
结果
发送一万条消息运行后需要的时间挺久的:10955 ms
可见性能寡气,
发布/订阅模式
发布消息
package com.zyc.active;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.Destination;
/**
* @author zhaopeng
* @description: 发布话题
* @date 2019/4/9
*/
@Service
public class Publisher {
@Resource
private JmsMessagingTemplate jmsMessagingTemplate;
public void publish(String destinationName, String message) {
Destination destination = new ActiveMQTopic(destinationName);
System.out.println("============>>>>> 发布topic消息 " + message);
jmsMessagingTemplate.convertAndSend(destination, message);
}
}
订阅消息
package com.zyc.active;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
/**
* @description: 订阅话题
*
* @author zhaopeng
* @date 2019/4/9
*/
@Service
public class Subscriber {
@JmsListener(destination = "zycTopic", containerFactory = "myJmsContainerFactory")
public void subscribe(String text) {
System.out.println("-----> 收到订阅msg为:" + text);
}
}
订阅配置
在发布/订阅模式中,对消息的监听需要对containerFactory进行以下配置
package com.zyc.active;
import javax.jms.ConnectionFactory;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
/**
* @author zhaopeng
* @description: ActiveMq 配置文件
* @date 2019/4/9
*/
@Configuration
@EnableJms
public class ActiveMQConfiguration {
/**
* 发布订阅
*/
@Bean
JmsListenerContainerFactory<?> myJmsContainerFactory(ConnectionFactory connectionFactory) {
SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true);
return factory;
}
}
启动类
package com.zyc;
import com.zyc.active.O2OSend;
import com.zyc.active.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.util.StopWatch;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
@RestController
public class Study15Application {
@Autowired
O2OSend send;
@Autowired
Publisher publisher;
@RequestMapping("/sendQueue")
public void sendQueue() {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
for (int i = 0; i < 10; i++) {
send.send("发送消息----zyc-----" + i);
}
stopWatch.stop();
System.out.println("发送消息耗时: " + stopWatch.getTotalTimeMillis());
}
@RequestMapping("/sendTopic")
public void sendTopic() {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
for (int i = 0; i < 10; i++) {
publisher.publish("zycTopic", "Topic Message " + i);
}
stopWatch.stop();
System.out.println("发送消息耗时: " + stopWatch.getTotalTimeMillis());
}
public static void main(String[] args) {
SpringApplication.run(Study15Application.class, args);
}
}
结果
模拟应用
按照以上点对点,发布/订阅模式,在SpringBoot中很容易就实现类两种模式的消息发送和接收。但是jms具体的应用场景是在不同的应用程序之间,生产者和消费者往往是在不同的应用中的。此外,以上例子中的消息我们只发送
字符串,其实还可以发送Object类型的消息,甚至可以使用messageCreator自定义消息的转换,而不使用convertAndSend方法默认转换。
多个应用程序之间发送消息
1.先使用一个只有发送者,没有消费者或订阅者的应用发送两类消息各十条
然后打开http://localhost:8161可以看到
两类都增有十条消息入列,但只有queues中还存留10条消息。
现在我们启动包含消费者和订阅者的应用程序,然后可以看到果然,只有
点对点消费者收到了queues中的消息。
这说明订阅者接收topic是需要在topic发布之前订阅;而生产/消费模式下,消息发出后会存放在队列中,等待消费者消费。
2.我们启动两个包含订阅者和消费者的程序,再发布消息
两个订阅者都收到 消息0~9, 而消费者中,一个收到消息 1、3、5、7、9,另一个收到0、2、4、6、8。
这说明有多个消息接收者时,生产/消费模式下多个消费者会轮流消费队列中的消息,而pub/sub模式下所有订阅者都会得到所有的消息。
以上就是在多个应用程序之间对发布/订阅模式和生产/消费模式的不同特点进行的一些小的验证;