消息中间件理解:

消息中间件实现系统之间解耦、异步访问。

在微服务架构下,各个业务模块都是独立的系统。整个业务由各个子系统链式调用完成。

消息中间件就充当了各个子系统间的缓冲角色。系统不直接调用系统,而是调用中间件。

适用于不需要即时响应的业务场景。

 

优点:1.上下游系统间解耦,避免互相影响。2.消息中间件队列管理请求,具有流量削峰,减少系统压力的作用。

缺点:增加业务系统复杂度,额外的中间件相关问题,及业务问题排查难度。

 

消息中间件主要功能:

1.发送/接收消息

2.集群/容错配置

3.高可用

4.持久化

5.延时/定时投递消息

6.确认机制

 

 

spring整合ActiveMQ,配置一个不仅有消息发送与接收,

也有消息生产者指定消费者回复的队列(目的地),并监听到回复消息的功能。

spring整合ActiveMQ,实现消息生产者指定消费者回复的队列(目的地),并监听到消费者回复的消息

 

一、配置消息连接工厂

1.连接工厂配置或则连接池工厂配置

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->

<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">

<property name="brokerURL" value="failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0)&amp;maxReconnectDelay=10000"/>

<property name="optimizedAckScheduledAckInterval" value="10000" /><!-- ACK优化,每隔10秒批量确认一次-->

<property name="trustAllPackages" value="true"/><!-- 信任所有包下的类都可以序列化 -->

</bean>

<!--连接池-->

<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">

<property name="connectionFactory" ref="targetConnectionFactory"/>

<property name="maxConnections" value="100"/>

</bean>

<!-- spring代理链接工厂-->

<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">

<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>

</bean>

2.队列、主题配置(创建就不说了)

<!--Topic-->

<bean id="msgTopic" class="org.apache.activemq.command.ActiveMQTopic">

<constructor-arg index="0" value="com.han.test.topic"/>

</bean>

<!--Queue-->

<bean id="msgQueue" class="org.apache.activemq.command.ActiveMQQueue">

<constructor-arg index="0" value="com.han.test.queue"/>

</bean>

 

二、配置生产者

1.关联连接工厂

2.关联队列或主题

3.消息转换器配置

<!-- 消息转换器 -->

<bean id="xstreamMarshaller" class="org.springframework.oxm.xstream.XStreamMarshaller"/>

<bean id="msgConverter" class="org.springframework.jms.support.converter.MarshallingMessageConverter" >

<property name="marshaller" ref="xstreamMarshaller"/>

<property name="unmarshaller" ref="xstreamMarshaller"/>

</bean>

4.Jms模板参数:发布模式、.发送模式、持久化、优先级、过期时间等

<!-- 消息发送Jms模板 -->

<bean id="msgJmsTemplate" class="org.springframework.jms.core.JmsTemplate">

<property name="connectionFactory" ref="connectionFactory"/>

<property name="pubSubDomain" value="true"/><!--发布/订阅模式 -->

<property name="explicitQosEnabled" value="true"/><!--是否会过期 -->

<property name="timeToLive" value="600000"/><!-- 消息存活时长 -->

<property name="deliveryMode" value="2" /><!--1-非持久化 2-持久化 -->

<property name="priority" value="2"/><!-- 优先级 -->

<property name="messageConverter" ref="msgConverter"/>

<property name="defaultDestination" ref="msgTopic"/>

</bean>

5.消息发送工具类

<!-- 发送工具类 -->

<bean id="msgSend" class="com.hwq.middleware.MessageSender">

<property name="msgJmsTemplate" ref="msgJmsTemplate"/>

<property name="replayQueue" ref="msgQueue"/>

<property name="messageConverter" ref="msgConverter"/>

</bean>

 

import org.springframework.jms.core.JmsTemplate;

import org.springframework.jms.core.MessageCreator;

import org.springframework.jms.support.converter.MessageConverter;

import javax.annotation.Resource;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.Session;

 

public class MessageSender {

public JmsTemplate msgJmsTemplate;

public Destination replayQueue;

private MessageConverter messageConverter;

public void setMsgJmsTemplate(JmsTemplate msgJmsTemplate) {

this.msgJmsTemplate = msgJmsTemplate;

}

public void setReplayQueue(Destination replayQueue) {

this.replayQueue = replayQueue;

}

public void setMessageConverter(MessageConverter messageConverter) {

this.messageConverter = messageConverter;

}

//发送消息

public void sendMessage(){

final Person p = new Person();

p.setFirstName("韩");

p.setLastName("伟其");

msgJmsTemplate.send(new MessageCreator() {

public Message createMessage(Session session) throws JMSException {

Message message=session.createObjectMessage(p);//传输序列化信息

message.setJMSReplyTo(replayQueue);//指定消费者回复的队列

return message;

}

});

}

}

三、配置消费者

1.关联连接工厂

2.关联队列或主题

3.配置消息转换器

4.消息接收/监听工具类

<!-- 配置消费者 -->

<!-- 消息监听器 -->

<bean id="webMessageListener" class="com.hwq.middleware.MyMessageListener" >

<property name="converter" ref="msgConverter"/>

</bean>

<bean id="webMessageListener2" class="com.hwq.middleware.MyMessageListener2" >

</bean>

<!-- 消息监听容器 -->

<bean id="jmsWebContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<property name="connectionFactory" ref="connectionFactory" />

<property name="destination" ref="msgTopic" />

<property name="messageListener" ref="webMessageListener" />

</bean>

import org.springframework.jms.listener.adapter.MessageListenerAdapter;

import org.springframework.jms.support.converter.MessageConverter;

import javax.jms.*;

//消费者接收信息并回复

public class MyMessageListener extends MessageListenerAdapter {

private MessageConverter converter;

public void onMessage(Message msg, Session session) {

try {

ObjectMessage obj=(ObjectMessage)msg;

Person p=(Person)obj.getObject();

System.out.println("MyMessageListener接收到消息:"+p.toString());

//获取回复的队列,并发送

MessageProducer replay=session.createProducer(msg.getJMSReplyTo());

replay.send(session.createTextMessage("回复信息,已收到:"+p.toString()));

} catch (JMSException e1) {

e1.printStackTrace();

}

}

public void setConverter(MessageConverter converter) {

this.converter = converter;

}

 

}

<!-- 消息监听容器2 -->

<bean id="jmsWebContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<property name="connectionFactory" ref="connectionFactory" />

<property name="destination" ref="msgQueue" />

<property name="messageListener" ref="webMessageListener2" />

</bean>

 

 

import org.springframework.jms.listener.adapter.MessageListenerAdapter;

import javax.jms.*;

//生产者接收回复的信息

public class MyMessageListener2 extends MessageListenerAdapter {

public void onMessage(Message msg, Session session) {

try {

TextMessage text=(TextMessage)msg;

System.out.println("接收消息:"+text.getText());

} catch (Exception e1) {

e1.printStackTrace();

}

}

}

 

用到的Person对象参数

import java.io.Serializable;

 

public class Person implements Serializable {

private static final long serialVersionUID = 1L;

private String firstName;

private String lastName;

public String toString() {

return lastName + " " + firstName;

}

public String getFirstName() {

return firstName;

}

public void setFirstName(String firstName) {

this.firstName = firstName;

}

public String getLastName() {

return lastName;

}

public void setLastName(String lastName) {

this.lastName = lastName;

}

}

测试结果正常

spring整合ActiveMQ,实现消息生产者指定消费者回复的队列(目的地),并监听到消费者回复的消息

spring整合ActiveMQ,实现消息生产者指定消费者回复的队列(目的地),并监听到消费者回复的消息

spring整合ActiveMQ,实现消息生产者指定消费者回复的队列(目的地),并监听到消费者回复的消息

spring整合ActiveMQ,实现消息生产者指定消费者回复的队列(目的地),并监听到消费者回复的消息

确认模式简介

1.Session.AUTO_ACKNOWLEDGE

消费者调用Message.acknowledge()返回或者onMessage返回时,自动确认。

2.Session.CLIENT_ACKNOWLEDGE

消费者只能调用Message.acknowledge()确认。确认所有已消费的消息。

3.Session.DUPS_ACKNOWLEDGE

消费者只能通过session.commit()提交确认,若生产者未标识该消息为重复 消息JMSRedelivered=true。则可一直被消费。

 

实现生产者指定消费者回复消息目的地。

生产者设置回复目的地JMSReplyTo(Destination),发送。

消费者获取到目的地,发送。

相关文章:

  • 2021-07-21
  • 2022-12-23
  • 2021-12-19
  • 2022-03-02
  • 2018-12-01
  • 2022-12-23
  • 2021-08-08
猜你喜欢
  • 2019-07-15
  • 2022-12-23
  • 2022-01-25
  • 2022-01-03
  • 2021-09-28
  • 2021-08-25
  • 2022-12-23
相关资源
相似解决方案