一、创建mvaen项目
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dev</groupId>
<artifactId>MessageQueen</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<name>MessageQueen</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.14.5</version>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.1.8.RELEASE</version>
</dependency>
</dependencies>
</project>
二、添加ActiveMQ属性配置文件activemq.properties
activemq.brokerurl=tcp://127.0.0.1:61616 activemq.username=********** acticemq.password=wuhanhongdev activemq.pool.max=10 activemq.queenName=**********
三、添加Spring和ActiveMQ集成文件spring-context.xml、spring-activemq.xml
spring-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder file-encoding="UTF-8" location="activemq.properties"/>
<context:annotation-config />
<context:component-scan base-package="com.dev.messagequeen"/>
<import resource="spring-activemq.xml"/>
</beans>
spring-activemq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--配置ActiveMQConnectionFactory-->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${activemq.brokerurl}"/>
<property name="userName" value="${activemq.username}"/>
<property name="password" value="${acticemq.password}"/>
<property name="trustedPackages">
<list>
<value>com.dev.messagequeen.model
</list>
</property>
</bean>
<!--配置ActiveMQ连接池-->
<bean id="poolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="maxConnections" value="${activemq.pool.max}"/>
</bean>
<!--集成spring和activemq -->
<bean id="springConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="connectionFactory"/>
</bean>
<!-- 配置消息队列名称-->
<bean id="msgQueen" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>${activemq.queenName}</value>
</constructor-arg>
</bean>
<!--配置消息模板,用于发送和接收消息-->
<bean id="jmsTmp" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="springConnectionFactory"/>
<property name="defaultDestinationName" value="${activemq.queenName}"/>
</bean>
<!--配置自定义监听-->
<bean id="messageListener" class="com.dev.messagequeen.listener.MessageQueenListener"/>
<bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="springConnectionFactory"/>
<property name="destination" ref="msgQueen"/>
<property name="messageListener" ref="messageListener"/>
</bean>
</beans>
四、代码编写
MsgProducer.java
package com.dev.messagequeen.listener;
import com.dev.messagequeen.model.UserModel;
import org.springframework.jms.listener.SessionAwareMessageListener;
import javax.jms.*;
public class MessageQueenListener implements SessionAwareMessageListener<Message> {
public void onMessage(Message message, Session session) throws JMSException {
if(message instanceof MapMessage){
MapMessage mapMessage = (MapMessage)message;
System.out.println(mapMessage.getString("message"));
}
if(message instanceof ObjectMessage){
ObjectMessage objectMessage = (ObjectMessage)message;
UserModel userModel = (UserModel)objectMessage.getObject();
if(userModel != null){
System.out.println("接受到的消息是:\r\n"+userModel.toString());
}
}
}
}
MsgConsumer.java
package com.dev.messagequeen.consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
@Service("msgConsumer")
public class MsgConsumer {
@Autowired
private JmsTemplate jmsTmp;
public void consumerMsg(){
//待续
}
}
UserModel.java
package com.dev.messagequeen.model;
import java.io.Serializable;
public class UserModel implements Serializable{
private String name;
private String addr;
private String nickName;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAddr() {
return addr;
}
public void setAddr(String addr) {
this.addr = addr;
}
public String getNickName() {
return nickName;
}
public void setNickName(String nickName) {
this.nickName = nickName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
单元测试:
AppTest.java
package com.dev;
import com.dev.messagequeen.model.UserModel;
import com.dev.messagequeen.producer.MsgProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(Spring4JunitRunner.class)
@ContextConfiguration(locations = {"classpath:spring-context.xml"})
public class AppTest {
@Autowired
private MsgProducer msgProducer;
@Test
public void testSend(){
for(int i=0; i<10; i++){
UserModel userModel = new UserModel();
userModel.setName("这是用户姓名"+i);
userModel.setAddr("中国云南");
userModel.setAge((i+1)*10);
userModel.setNickName("NickName");
msgProducer.sendMsg(userModel);
}
}
}
五、整体项目结构
測試說明:
- 配置了MessageQueenListener即配置了消費端,無需自己再實現消費端。
- 必須添加trustedPackages屬性執行model包才能消費對象消息(ObjectMessage)
更多期待:
- 與Spring集成可以有多個生產者和多個消費者
- 採用ActiveMQ集群
- 添加多線程支持
转载于:https://my.oschina.net/u/940506/blog/914955