实现使用Exchange类型为DirectExchange. routingkey的名称默认为Queue的名称。异步发送消息。
1.配置文件
-
#============== rabbitmq config ==================== -
rabbit.hosts=192.168.36.102 -
rabbit.username=admin -
rabbit.password=admin -
rabbit.virtualHost=/ -
rabbit.queue=spring-queue-async -
rabbit.routingKey=spring-queue-async#<span style="font-family: Helvetica, Tahoma, Arial, sans-serif; font-size: 14px; line-height: 25.2000007629395px;">routingkey的名称默认为Queue的名称</span>
2.生产者配置applicationContext-rabbitmq-async-send.xml:
-
<?xml version="1.0" encoding="UTF-8"?> -
<beans xmlns="http://www.springframework.org/schema/beans" -
xmlns:context="http://www.springframework.org/schema/context" -
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" -
xsi:schemaLocation=" -
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd -
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"> -
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> -
<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" /> -
<property name="ignoreResourceNotFound" value="true" /> -
<property name="locations"> -
<list> -
<!-- 标准配置 --> -
<value>classpath*:/application.properties</value> -
</list> -
</property> -
</bean> -
<!-- 创建connectionFactory --> -
<bean id="rabbitConnectionFactory" -
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> -
<constructor-arg value="${rabbit.hosts}"/> -
<property name="username" value="${rabbit.username}"/> -
<property name="password" value="${rabbit.password}"/> -
<property name="virtualHost" value="${rabbit.virtualHost}"/> -
<property name="channelCacheSize" value="5"/> -
</bean> -
<!-- 创建rabbitAdmin 代理类 --> -
<bean id="rabbitAdmin" -
class="org.springframework.amqp.rabbit.core.RabbitAdmin"> -
<constructor-arg ref="rabbitConnectionFactory" /> -
</bean> -
<!-- 创建rabbitTemplate 消息模板类 -
--> -
<bean id="rabbitTemplate" -
class="org.springframework.amqp.rabbit.core.RabbitTemplate"> -
<constructor-arg ref="rabbitConnectionFactory"></constructor-arg> -
<property name="routingKey" value="${rabbit.routingKey}"></property> -
</bean> -
</beans>
3.生产者发送消息代码Send.java
-
package cn.slimsmart.rabbitmq.demo.spring.async; -
import org.springframework.amqp.core.AmqpTemplate; -
import org.springframework.amqp.rabbit.core.RabbitTemplate; -
import org.springframework.context.ApplicationContext; -
import org.springframework.context.support.ClassPathXmlApplicationContext; -
public class Send { -
public static void main(String[] args) throws InterruptedException { -
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-rabbitmq-async-send.xml"); -
AmqpTemplate amqpTemplate = context.getBean(RabbitTemplate.class); -
for(int i=0;i<1000;i++){ -
amqpTemplate.convertAndSend("test spring async=>"+i); -
Thread.sleep(3000); -
} -
} -
}
4.处理消息类ReceiveMsgHandler.java
-
package cn.slimsmart.rabbitmq.demo.spring.async; -
public class ReceiveMsgHandler { -
public void handleMessage(String text) { -
System.out.println("Received: " + text); -
} -
}
5.配置applicationContext-rabbitmq-async-receive.xml:
-
<?xml version="1.0" encoding="UTF-8"?> -
<beans xmlns="http://www.springframework.org/schema/beans" -
xmlns:context="http://www.springframework.org/schema/context" -
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" -
xsi:schemaLocation=" -
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd -
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"> -
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> -
<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" /> -
<property name="ignoreResourceNotFound" value="true" /> -
<property name="locations"> -
<list> -
<!-- 标准配置 --> -
<value>classpath*:/application.properties</value> -
</list> -
</property> -
</bean> -
<!-- 创建connectionFactory --> -
<bean id="rabbitConnectionFactory" -
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> -
<constructor-arg value="${rabbit.hosts}"/> -
<property name="username" value="${rabbit.username}"/> -
<property name="password" value="${rabbit.password}"/> -
<property name="virtualHost" value="${rabbit.virtualHost}"/> -
<property name="channelCacheSize" value="5"/> -
</bean> -
<!-- 声明消息转换器为SimpleMessageConverter --> -
<bean id="messageConverter" -
class="org.springframework.amqp.support.converter.SimpleMessageConverter"> -
</bean> -
<!-- 监听生产者发送的消息开始 --> -
<!-- 用于接收消息的处理类 --> -
<bean id="receiveHandler" -
class="cn.slimsmart.rabbitmq.demo.spring.async.ReceiveMsgHandler"> -
</bean> -
<!-- 用于消息的监听的代理类MessageListenerAdapter --> -
<bean id="receiveListenerAdapter" -
class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter"> -
<constructor-arg ref="receiveHandler" /> -
<property name="defaultListenerMethod" value="handleMessage"></property> -
<property name="messageConverter" ref="messageConverter"></property> -
</bean> -
<!-- 用于消息的监听的容器类SimpleMessageListenerContainer,对于queueName的值一定要与定义的Queue的值相同 --> -
<bean id="listenerContainer" -
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"> -
<property name="queueNames" value="${rabbit.queue}"></property> -
<property name="connectionFactory" ref="rabbitConnectionFactory"></property> -
<property name="messageListener" ref="receiveListenerAdapter"></property> -
</bean> -
<!-- 监听生产者发送的消息结束 --> -
</beans>
5.接收消息启动类Receive.java
-
package cn.slimsmart.rabbitmq.demo.spring.async; -
import org.springframework.context.support.ClassPathXmlApplicationContext; -
public class Receive { -
public static void main(String[] args) { -
new ClassPathXmlApplicationContext("applicationContext-rabbitmq-async-receive.xml"); -
} -
}
启动接收消息,再发送消息
-
Received: test spring async=>0 -
Received: test spring async=>1 -
Received: test spring async=>2 -
Received: test spring async=>3 -
Received: test spring async=>4 -
Received: test spring async=>5 -
Received: test spring async=>6 -
Received: test spring async=>7 -
......
若报如下错误,说明消息队列不存在,请在控制台添加消息队列。
-
log4j:WARN No appenders could be found for logger (org.springframework.core.env.StandardEnvironment). -
log4j:WARN Please initialize the log4j system properly. -
Exception in thread "main" org.springframework.context.ApplicationContextException: Failed to start bean 'listenerContainer'; nested exception is org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup -
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:170) -
at org.springframework.context.support.DefaultLifecycleProcessor.access$1(DefaultLifecycleProcessor.java:154) -
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:339) -
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:143) -
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:108) -
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:931) -
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:472) -
at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73) -
at cn.slimsmart.rabbitmq.demo.spring.async.Consumer.main(Consumer.java:7) -
Caused by: org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup -
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doStart(SimpleMessageListenerContainer.java:333) -
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:360) -
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:167) -
... 8 more -
Caused by: org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it. -
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:228) -
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:516) -
at java.lang.Thread.run(Unknown Source) -
Caused by: java.io.IOException -
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) -
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) -
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) -
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:788) -
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61) -
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) -
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) -
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) -
at java.lang.reflect.Method.invoke(Unknown Source) -
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:348) -
at com.sun.proxy.$Proxy8.queueDeclarePassive(Unknown Source) -
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:213) -
... 2 more -
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'spring-queue-async' in vhost '/', class-id=50, method-id=10), null, ""} -
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) -
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) -
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343) -
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) -
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) -
... 11 more -
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'spring-queue-async' in vhost '/', class-id=50, method-id=10), null, ""} -
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:473) -
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:313) -
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) -
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) -
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)
控制台添加队列。
原文地址:https://blog.csdn.net/zhu_tianwei/article/details/40919031