【发布时间】:2017-05-22 10:13:58
【问题描述】:
我有一个 Java 应用程序,它使用我认为相当样板的 JMS 代码连接到 ActiveMQ 服务器并从其接收消息。
this.consumerFactory = new ActiveMQConnectionFactory(this.ingestItemBrokerUrl);
this.consumerConnection = this.consumerFactory.createConnection();
this.consumerConnection.start();
this.consumerSession = this.consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.consumerDestination = this.consumerSession.createTopic(getIngestItemDestinationName());
this.consumer = this.consumerSession.createConsumer(this.consumerDestination);
我的应用程序在循环中调用MessageConsumer.receive()(直到连接关闭)来处理到达 ActiveMQ 主题的消息:
message = this.consumer.receive();
谜底如下:
当我连接到在 localhost 上运行的 ActiveMQ 服务器时,它按预期工作。但是,当我连接到在 Azure 云机器上运行的 ActiveMQ 服务器(加载了 Bitnami 的 ActiveMQ 堆栈)时,receive() 调用会无限期地阻塞,即使我可以从 AMQ 管理控制台看到我的客户端已连接并取消了消息。
为什么从本地服务器切换到远程服务器时会看到不同的行为?如何进一步排除故障?
我的云 activemq.xml 配置文件如下:
<beans xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean id="configurationEncryptor" class="org.jasypt.encryption.pbe.StandardPBEStringEncryptor">
<property name="algorithm" value="PBEWithMD5AndDES"/>
<property name="password" value="**REDACTED**"/>
</bean>
<bean id="propertyConfigurer" class="org.jasypt.spring31.properties.EncryptablePropertyPlaceholderConfigurer">
<constructor-arg ref="configurationEncryptor"/>
<property name="location" value="file:${activemq.conf}/credentials-enc.properties"/>
</bean>
<!-- Allows accessing the server log -->
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop">
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<plugins>
<!--simpleAuthenticationPlugin>
<users>
<authenticationUser username="${activemq.username}" password="${activemq.password}" groups="admins"/>
</users>
</simpleAuthenticationPlugin-->
<!-- if not already set, set ttl to 1 minutes -->
<timeStampingBrokerPlugin zeroExpirationOverride="60000"/>
</plugins>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor/>
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
<!-- Use VM cursor for better latency
For more information, see:
http://activemq.apache.org/message-cursors.html
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<!--
The systemUsage controls the maximum amount of space the broker will
use before slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
If using ActiveMQ embedded - the following limits could safely be used:
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
-->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="64 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ssl" uri="ssl://0.0.0.0:61617?maximumConnections=1000&trace=true&needClientAuth=true"/>
</transportConnectors>
<!-- SSL Configuration Context -->
<sslContext>
<sslContext keyStore="file:${activemq.conf}/amq-server.ks"
keyStorePassword="**REDACTED**"
trustStore="file:${activemq.conf}/amq-server.ts"
trustStorePassword="**REDACTED**" />
</sslContext>
</broker>
<!--
Enable web consoles, REST and Ajax APIs and demos
The web consoles requires by default login, you can disable this in the jetty.xml file
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans><!-- END SNIPPET: example -->
【问题讨论】:
-
receive 方法被设计为无限期阻塞,直到收到消息这是正常的,您是否在消费者运行时向主题发送了消息而不消费它,在这种情况下接收方法不返回?云中的配置不同吗?你能发布 AMQ 云配置吗
-
我知道
receive()应该阻止直到消息到达。使用 locahost,它会阻塞,直到我发送消息。使用云 AMQ,即使在我发送消息后它也会阻塞。 -
你能发布云AMQ的xml配置吗
-
添加云 activemq.xml 配置。
标签: java azure jms activemq message-queue