消息中间件理解:
消息中间件实现系统之间解耦、异步访问。
在微服务架构下,各个业务模块都是独立的系统。整个业务由各个子系统链式调用完成。
消息中间件就充当了各个子系统间的缓冲角色。系统不直接调用系统,而是调用中间件。
适用于不需要即时响应的业务场景。
优点:1.上下游系统间解耦,避免互相影响。2.消息中间件队列管理请求,具有流量削峰,减少系统压力的作用。
缺点:增加业务系统复杂度,额外的中间件相关问题,及业务问题排查难度。
消息中间件主要功能:
1.发送/接收消息
2.集群/容错配置
3.高可用
4.持久化
5.延时/定时投递消息
6.确认机制
spring整合ActiveMQ,配置一个不仅有消息发送与接收,
也有消息生产者指定消费者回复的队列(目的地),并监听到回复消息的功能。
一、配置消息连接工厂
1.连接工厂配置或则连接池工厂配置
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0)&maxReconnectDelay=10000"/>
<property name="optimizedAckScheduledAckInterval" value="10000" /><!-- ACK优化,每隔10秒批量确认一次-->
<property name="trustAllPackages" value="true"/><!-- 信任所有包下的类都可以序列化 -->
</bean>
<!--连接池-->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory"/>
<property name="maxConnections" value="100"/>
</bean>
<!-- spring代理链接工厂-->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>
2.队列、主题配置(创建就不说了)
<!--Topic-->
<bean id="msgTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="com.han.test.topic"/>
</bean>
<!--Queue-->
<bean id="msgQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="com.han.test.queue"/>
</bean>
二、配置生产者
1.关联连接工厂
2.关联队列或主题
3.消息转换器配置
<!-- 消息转换器 -->
<bean id="xstreamMarshaller" class="org.springframework.oxm.xstream.XStreamMarshaller"/>
<bean id="msgConverter" class="org.springframework.jms.support.converter.MarshallingMessageConverter" >
<property name="marshaller" ref="xstreamMarshaller"/>
<property name="unmarshaller" ref="xstreamMarshaller"/>
</bean>
4.Jms模板参数:发布模式、.发送模式、持久化、优先级、过期时间等
<!-- 消息发送Jms模板 -->
<bean id="msgJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="pubSubDomain" value="true"/><!--发布/订阅模式 -->
<property name="explicitQosEnabled" value="true"/><!--是否会过期 -->
<property name="timeToLive" value="600000"/><!-- 消息存活时长 -->
<property name="deliveryMode" value="2" /><!--1-非持久化 2-持久化 -->
<property name="priority" value="2"/><!-- 优先级 -->
<property name="messageConverter" ref="msgConverter"/>
<property name="defaultDestination" ref="msgTopic"/>
</bean>
5.消息发送工具类
<!-- 发送工具类 -->
<bean id="msgSend" class="com.hwq.middleware.MessageSender">
<property name="msgJmsTemplate" ref="msgJmsTemplate"/>
<property name="replayQueue" ref="msgQueue"/>
<property name="messageConverter" ref="msgConverter"/>
</bean>
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.support.converter.MessageConverter;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
public class MessageSender {
public JmsTemplate msgJmsTemplate;
public Destination replayQueue;
private MessageConverter messageConverter;
public void setMsgJmsTemplate(JmsTemplate msgJmsTemplate) {
this.msgJmsTemplate = msgJmsTemplate;
}
public void setReplayQueue(Destination replayQueue) {
this.replayQueue = replayQueue;
}
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
//发送消息
public void sendMessage(){
final Person p = new Person();
p.setFirstName("韩");
p.setLastName("伟其");
msgJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message message=session.createObjectMessage(p);//传输序列化信息
message.setJMSReplyTo(replayQueue);//指定消费者回复的队列
return message;
}
});
}
}
三、配置消费者
1.关联连接工厂
2.关联队列或主题
3.配置消息转换器
4.消息接收/监听工具类
<!-- 配置消费者 -->
<!-- 消息监听器 -->
<bean id="webMessageListener" class="com.hwq.middleware.MyMessageListener" >
<property name="converter" ref="msgConverter"/>
</bean>
<bean id="webMessageListener2" class="com.hwq.middleware.MyMessageListener2" >
</bean>
<!-- 消息监听容器 -->
<bean id="jmsWebContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="msgTopic" />
<property name="messageListener" ref="webMessageListener" />
</bean>
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.jms.support.converter.MessageConverter;
import javax.jms.*;
//消费者接收信息并回复
public class MyMessageListener extends MessageListenerAdapter {
private MessageConverter converter;
public void onMessage(Message msg, Session session) {
try {
ObjectMessage obj=(ObjectMessage)msg;
Person p=(Person)obj.getObject();
System.out.println("MyMessageListener接收到消息:"+p.toString());
//获取回复的队列,并发送
MessageProducer replay=session.createProducer(msg.getJMSReplyTo());
replay.send(session.createTextMessage("回复信息,已收到:"+p.toString()));
} catch (JMSException e1) {
e1.printStackTrace();
}
}
public void setConverter(MessageConverter converter) {
this.converter = converter;
}
}
<!-- 消息监听容器2 -->
<bean id="jmsWebContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="msgQueue" />
<property name="messageListener" ref="webMessageListener2" />
</bean>
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import javax.jms.*;
//生产者接收回复的信息
public class MyMessageListener2 extends MessageListenerAdapter {
public void onMessage(Message msg, Session session) {
try {
TextMessage text=(TextMessage)msg;
System.out.println("接收消息:"+text.getText());
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
用到的Person对象参数
import java.io.Serializable;
public class Person implements Serializable {
private static final long serialVersionUID = 1L;
private String firstName;
private String lastName;
public String toString() {
return lastName + " " + firstName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
}
测试结果正常
确认模式简介:
1.Session.AUTO_ACKNOWLEDGE
消费者调用Message.acknowledge()返回或者onMessage返回时,自动确认。
2.Session.CLIENT_ACKNOWLEDGE
消费者只能调用Message.acknowledge()确认。确认所有已消费的消息。
3.Session.DUPS_ACKNOWLEDGE
消费者只能通过session.commit()提交确认,若生产者未标识该消息为重复 消息JMSRedelivered=true。则可一直被消费。
实现生产者指定消费者回复消息目的地。
生产者设置回复目的地JMSReplyTo(Destination),发送。
消费者获取到目的地,发送。