一、本文章包含的内容
1、列举了ActiveMQ中通过Queue方式发送、消费队列的代码(普通文本、json/xml字符串、对象数据)
2、spring+activemq方式
二、配置信息
1、activemq的pom.xml信息
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
<!--activemq Begin--> <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>${spring.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.0</version>
</dependency>
<!--activemq End-->
|
2、activemq的配置文件:spring-jms.xml
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
<!-- 启用spring mvc 注解 --> <context:component-scan base-package="org.soa.test.activemq"/>
<!-- 配置JMS连接工厂 -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!--解决接收消息抛出异常:javax.jms.JMSException: Failed to build body from content. Serializable class not available to broke-->
<property name="trustAllPackages" value="true"/>
<!-- 是否异步发送 -->
<property name="useAsyncSend" value="true" />
</bean>
<!-- Queue模式 Begin -->
<!-- 定义消息队列(Queue) -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg>
<value>defaultQueueName</value>
</constructor-arg>
</bean>
<!-- 配置JMS模板,Spring提供的JMS工具类,它发送、接收消息。(Queue) -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="queueDestination" />
<property name="pubSubDomain" value="false"/>
<!--接收超时时间-->
<!--<property name="receiveTimeout" value="10000" />-->
</bean>
<!-- Queue模式 End -->
|
三、队列发送端及测试程序
1、发送代码
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
package org.soa.test.activemq.queues;
import org.soa.test.activemq.StudentInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import java.util.List;
/** * Created by JamesC on 16-9-22.
*/
@Componentpublic class ProduceMsg {
@Autowired
private JmsTemplate jmsTemplate;
/**
* 向指定队列发送消息
*/
public void sendMessage(Destination destination, final String msg) {
System.out.println("向队列" + destination.toString() + "发送了消息------------" + msg);
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
/**
* 向默认队列发送消息(默认队列名称在bean:queueDestination配置)
*/
public void sendMessage(final String msg) {
String destination = jmsTemplate.getDefaultDestination().toString();
System.out.println("向队列" + destination + "发送了消息------------" + msg);
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
/**
* 向默认队列发送消息
*/
public void sendMessageConvertAndSend(final Object msg) {
String destination = jmsTemplate.getDefaultDestination().toString();
System.out.println("向队列" + destination + "发送了消息------------" + msg);
//使用内嵌的MessageConverter进行数据类型转换,包括xml(JAXB)、json(Jackson)、普通文本、字节数组
jmsTemplate.convertAndSend(destination, msg);
}
/**
* 向指定队列发送消息
*/
public void sendStudentInfo(Destination destination, final StudentInfo msg) {
System.out.println("向队列" + destination.toString() + "发送了消息------------" + msg);
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(msg);
}
});
}
} |
2、测试程序
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
package org.soa.test.activemq.queues;
import com.alibaba.fastjson.JSON;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.soa.test.activemq.StudentInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.jms.Destination;
import java.util.Date;
/** * Created by JamesC on 16-9-22.
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/spring-jms.xml")
public class ProduceMsgTest extends AbstractJUnit4SpringContextTests {
@Autowired
protected ApplicationContext ctx;
/**
* 队列名queue1 这里使用jms配置文件中的数据
*/
@Autowired
private Destination queueDestination;
/**
* 队列消息生产者
*/
@Autowired
private ProduceMsg produceMessage;
//向默认队列发消息(文本)
@Test
public void produceMsg_DefaultQueue() {
String msg = "这里是向默认队列发送的消息" + new Date().toString();
produceMessage.sendMessage(msg);
}
//向默认队列发消息(Json字符串)
@Test
public void produceMsg_Json() {
StudentInfo info = new StudentInfo();
info.setId(1);
info.setStdName("李磊");
info.setStdNo("001");
info.setEnterDate(new Date()); //队列存放的是时间戳
String alibabaJson = JSON.toJSONString(info);
produceMessage.sendMessage(alibabaJson);
}
//向默认队列发消息(使用convertAndSend发送对象)
@Test
public void produceMsg_ConvertAndSend() {
StudentInfo info = new StudentInfo();
info.setId(1);
info.setStdName("李磊");
info.setStdNo("001");
info.setEnterDate(new Date());
produceMessage.sendMessageConvertAndSend(info);
}
//向指定队列发消息(文本)
@Test
public void produceMsg_CustomQueue() {
for (int i = 0; i < 20; i++) {
ActiveMQQueue myDestination = new ActiveMQQueue("queueCustom");
produceMessage.sendMessage(myDestination, "----发送消息给queueCustom");
}
}
//向指定队列发消息(队列名称从XML读取)
@Test
public void produceMsg_XmlQueue() {
for (int i = 0; i < 20; i++) {
ActiveMQQueue destinationQueue = (ActiveMQQueue) applicationContext.getBean("queueDestination");
produceMessage.sendMessage(destinationQueue, "----send my msg to queueXml");
}
}
//向指定队列发消息(发送对象)
@Test
public void produceMsg_StudentInfo() {
StudentInfo info = new StudentInfo();
info.setId(1);
info.setStdName("李磊");
info.setStdNo("001");
info.setEnterDate(new Date());
ActiveMQQueue destination = new ActiveMQQueue("StudentInfo");
produceMessage.sendStudentInfo(destination, info);
}
} |
四、队列消费端及测试程序
1、消费代码
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
package org.soa.test.activemq.queues;
import org.soa.test.activemq.StudentInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsUtils;
import org.springframework.stereotype.Component;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
/** * Created by JamesC on 16-9-22.
*/
@Componentpublic class ConsumeMsg {
@Autowired
private JmsTemplate jmsTemplate;
/**
* 接受消息
*/
public String receive(Destination destination) {
TextMessage tm = (TextMessage) jmsTemplate.receive(destination);
String msg = "";
try {
msg = tm.getText();
System.out.println("从队列" + destination.toString() + "收到了消息:\t" + msg);
} catch (JMSException e) {
e.printStackTrace();
return "";
}
return msg;
}
/**
* 接受消息
*/
public StudentInfo receiveStudentInfo() {
try {
String destination = jmsTemplate.getDefaultDestination().toString();
ObjectMessage msg=(ObjectMessage)jmsTemplate.receive(destination);
return (StudentInfo)msg.getObject();
} catch (JMSException e) {
//检查性异常转换为非检查性异常
throw JmsUtils.convertJmsAccessException(e);
}
}
/**
* 接受消息
*/
public Object receiveConvertAndReceive() {
String destination = jmsTemplate.getDefaultDestination().toString();
Object msg = jmsTemplate.receiveAndConvert(destination);
return msg;
}
} |
2、测试程序
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package org.soa.test.activemq.queues;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.soa.test.activemq.StudentInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/** * Created by JamesC on 16-9-22.
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/spring-jms.xml")
public class ConsumeMsgTest {
@Autowired
private ConsumeMsg consumeMsg;
//从指定队列接收消息(文本)
@Test
public void receiveMsg() {
//没有消息阻塞一段时间后会抛异常
//java.lang.NullPointerException
ActiveMQQueue destination = new ActiveMQQueue("defaultQueueName");
consumeMsg.receive(destination);
}
//从指定队列接收消息(StudentInfo对象消息)
@Test
public void receiveStudentInfo() {
StudentInfo msg = consumeMsg.receiveStudentInfo();
System.out.println(msg.getStdName());
}
//从指定队列接收消息(Json对象)
@Test
public void receiveConvertAndReceive() {
StudentInfo msg =(StudentInfo) consumeMsg.receiveConvertAndReceive();
System.out.println(msg.getStdName());
}
} |