概述:

mq--activeMQ

作用:

  1. 解决业务耦合问题,实现异步消息处理
  2. 解决高并发消息处理问题

使用原则:

    不需要立即返回结果的,可以异步处理的。就可以用MQ方式。

    但如果需要立即返回结果,必须同步处理的,就得用Webservice。

 

ActiveMQ的下载安装运行

activemq的不同版本对jdk有要求:

  • 5.15.x以后的版本需要jdk1.8
  • 5.14之前的版本,只需要jdk1.7(1.7的版本也不能太老)

下载地址: http://activemq.apache.org/components/classic/download/

mq--activeMQ

 

安装 :

解压到一个目录

mq--activeMQ

  • bin:可执行文件
  • conf:配置
  • data:activemq数据持久化存储地方kaha
  • docs:文档
  • example:例子
  • lib:软件运行的jar
  • webapps:在线控制台
  • activemq-all.jar:开发jar

运行 :

根据操作系统和jdk的位数选择该命令执行:

mq--activeMQ

第一次启动会初始化。

控制台:

http://localhost:8161

mq--activeMQ

登录的用户名和密码:admin/admin

点击输入 用户名和密码

mq--activeMQ

mq--activeMQ

项目内配置端口地址

mq--activeMQ

 

ActiveMQ 中的生产者和消费者模型两种消息结构

ActiveMQ使用的是标准的生产者和消费者模型。

ActiveMQ支持两种消息结构:

  • Queue:队列消息

mq--activeMQ

  • Topic:话题消息-订阅消息

mq--activeMQ

mq--activeMQ

小结:

  1. 队列消息:一个消息只能由一个消费者消费,一个消费者可以消费多个消息
  2. 话题消息(订阅消息):一个消息可以由多个消费者消费,但前提是必须先订阅类该消息才行。

Spring整合方式实现生产者和消费者

jar包依赖

  <!-- activemq -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.14.5</version>
        </dependency>
        <!-- spring整合MQ -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>5.0.2.RELEASE</version>
        </dependency>

 

直接配置Spring配置文件:配置mq的连接、模版对象

mq--activeMQ

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
		http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms.xsd
		http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd ">

    <!-- ActiveMQ 连接工厂 -->
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
    <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
    <amq:connectionFactory id="amqConnectionFactory"
                           brokerURL="tcp://localhost:61616" userName="admin" password="admin"  />

    <!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- 同上,同理 -->
        <!-- <constructor-arg ref="amqConnectionFactory" /> -->
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- Spring JmsTemplate 的消息生产者 start-->

    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
        <property name="pubSubDomain" value="false" />
    </bean>

    <!-- 定义JmsTemplate的Topic类型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <constructor-arg ref="connectionFactory" />
        <!-- pub/sub模型(发布/订阅) -->
        <property name="pubSubDomain" value="true" />
    </bean>

    <!--Spring JmsTemplate 的消息生产者 end-->
</beans>

生产者

package cn.bufanli.controller;

import cn.bufanli.pojo.DeviceInfo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * @author BuShuangLi
 * @date 2019/1/21
 */
@RestController
@RequestMapping("test")
public class TestController {

	//注入模版对象
	//注入queue的消息模版对象
	@Autowired
	@Qualifier("jmsQueueTemplate")//必须根据名字注入
	private JmsTemplate jmsQueueTemplate;
	//注入topic的消息模版对象
	@Autowired
	@Qualifier("jmsTopicTemplate")//必须根据名字注入
	private JmsTemplate jmsTopicTemplate;

	@RequestMapping("producerTest ")
	public void producerTest() {
		//循环发送10条消息
		for (int i = 0; i < 10; i++) {
			//发queue的消息
			//参数1:队列的名字,取的时候要用
			//参数2:消息创建者(spring封装对象)
			final int j = i;
			//匿名内部类写法
			jmsQueueTemplate.send("spring.queue.hello", new MessageCreator() {
				@Override
				public Message createMessage(Session session) throws JMSException {
					TextMessage message = session.createTextMessage("Queue消息:spring整合后的helloworld!" + j);
					return message;
				}
			});
			//lambda表达式写法
			jmsQueueTemplate.send("spring.queue.hello.lambda", s -> {
				Message message = s.createMessage();
				message.setStringProperty("测试Queue消息key--lambda","测试value");
				return message;
			});
		};
		//发t的消息
		jmsTopicTemplate.send("spring.topic.hello", new MessageCreator() {
			@Override
			public Message createMessage(Session session) throws JMSException {
				TextMessage message = session.createTextMessage("Topic消息:spring整合后的helloworld!");
				return message;
			}
		});
		jmsTopicTemplate.send("spring.topic.hello.lambda",s->{
			Message message = s.createMessage();
			message.setStringProperty("Topic消息Lambda","测试Topiclambda");
			return message;
		});
	}


}

消费者

mq--activeMQ

第一步:先编写消息处理对象(消费者)

目标:弄两个队列消费者对象和两个话题的消费者对象

未完----

 

相关文章: