【问题标题】:JMS/ActiveMQ MessageConsumer.recieve() not returningJMS/ActiveMQ MessageConsumer.recieve() 不返回
【发布时间】:2017-05-22 10:13:58
【问题描述】:

我有一个 Java 应用程序,它使用我认为相当样板的 JMS 代码连接到 ActiveMQ 服务器并从其接收消息。

this.consumerFactory = new ActiveMQConnectionFactory(this.ingestItemBrokerUrl);
this.consumerConnection = this.consumerFactory.createConnection();
this.consumerConnection.start();
this.consumerSession = this.consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.consumerDestination = this.consumerSession.createTopic(getIngestItemDestinationName());
this.consumer = this.consumerSession.createConsumer(this.consumerDestination);

我的应用程序在循环中调用MessageConsumer.receive()(直到连接关闭)来处理到达 ActiveMQ 主题的消息:

message = this.consumer.receive();

谜底如下:

当我连接到在 localhost 上运行的 ActiveMQ 服务器时,它按预期工作。但是,当我连接到在 Azure 云机器上运行的 ActiveMQ 服务器(加载了 Bitnami 的 ActiveMQ 堆栈)时,receive() 调用会无限期地阻塞,即使我可以从 AMQ 管理控制台看到我的客户端已连接并取消了消息。

为什么从本地服务器切换到远程服务器时会看到不同的行为?如何进一步排除故障?

我的云 activemq.xml 配置文件如下:

<beans xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd   http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<bean id="configurationEncryptor" class="org.jasypt.encryption.pbe.StandardPBEStringEncryptor">
 <property name="algorithm" value="PBEWithMD5AndDES"/>
 <property name="password" value="**REDACTED**"/>
</bean>

<bean id="propertyConfigurer" class="org.jasypt.spring31.properties.EncryptablePropertyPlaceholderConfigurer">
  <constructor-arg ref="configurationEncryptor"/>
  <property name="location" value="file:${activemq.conf}/credentials-enc.properties"/>
</bean>

<!-- Allows accessing the server log -->
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop">
</bean>

<!--
    The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
    <plugins>
        <!--simpleAuthenticationPlugin>
            <users>
                <authenticationUser username="${activemq.username}" password="${activemq.password}" groups="admins"/>
            </users>
        </simpleAuthenticationPlugin-->

        <!-- if not already set, set ttl to 1 minutes -->
        <timeStampingBrokerPlugin zeroExpirationOverride="60000"/>

    </plugins>

    <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic="&gt;" producerFlowControl="true" memoryLimit="1mb">
              <pendingSubscriberPolicy>
                <vmCursor/>
              </pendingSubscriberPolicy>
            </policyEntry>
            <policyEntry queue="&gt;" producerFlowControl="true" memoryLimit="1mb">
              <!-- Use VM cursor for better latency
                   For more information, see:

                   http://activemq.apache.org/message-cursors.html

              <pendingQueuePolicy>
                <vmQueueCursor/>
              </pendingQueuePolicy>
              -->
            </policyEntry>
          </policyEntries>
        </policyMap>
    </destinationPolicy>


    <!--
        The managementContext is used to configure how ActiveMQ is exposed in
        JMX. By default, ActiveMQ uses the MBean server that is started by
        the JVM. For more information, see:

        http://activemq.apache.org/jmx.html
    -->
    <managementContext>
        <managementContext createConnector="false"/>
    </managementContext>

    <!--
        Configure message persistence for the broker. The default persistence
        mechanism is the KahaDB store (identified by the kahaDB tag).
        For more information, see:

        http://activemq.apache.org/persistence.html
    -->
    <persistenceAdapter>
        <kahaDB directory="${activemq.data}/kahadb"/>
    </persistenceAdapter>


      <!--
        The systemUsage controls the maximum amount of space the broker will
        use before slowing down producers. For more information, see:
        http://activemq.apache.org/producer-flow-control.html
        If using ActiveMQ embedded - the following limits could safely be used:

    <systemUsage>
        <systemUsage>
            <memoryUsage>
                <memoryUsage limit="20 mb"/>
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="1 gb"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="100 mb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>
    -->
      <systemUsage>
        <systemUsage>
            <memoryUsage>
                <memoryUsage limit="64 mb"/>
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="1 gb"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="100 mb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>

    <!--
        The transport connectors expose ActiveMQ over a given protocol to
        clients and other brokers. For more information, see:

        http://activemq.apache.org/configuring-transports.html
    -->
    <transportConnectors>
        <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>


        <transportConnector name="ssl" uri="ssl://0.0.0.0:61617?maximumConnections=1000&amp;trace=true&amp;needClientAuth=true"/>
    </transportConnectors>

                    <!-- SSL Configuration Context -->
            <sslContext>
               <sslContext keyStore="file:${activemq.conf}/amq-server.ks"
                           keyStorePassword="**REDACTED**"
                                       trustStore="file:${activemq.conf}/amq-server.ts"
                   trustStorePassword="**REDACTED**" />
            </sslContext>


</broker>

<!--
    Enable web consoles, REST and Ajax APIs and demos
    The web consoles requires by default login, you can disable this in the jetty.xml file

    Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>

</beans><!-- END SNIPPET: example -->

【问题讨论】:

  • receive 方法被设计为无限期阻塞,直到收到消息这是正常的,您是否在消费者运行时向主题发送了消息而不消费它,在这种情况下接收方法不返回?云中的配置不同吗?你能发布 AMQ 云配置吗
  • 我知道receive() 应该阻止直到消息到达。使用 locahost,它会阻塞,直到我发送消息。使用云 AMQ,即使在我发送消息后它也会阻塞。
  • 你能发布云AMQ的xml配置吗
  • 添加云 activemq.xml 配置。

标签: java azure jms activemq message-queue


【解决方案1】:

您可以尝试使用此作为目标名称this.consumerDestination = this.consumerSession.createTopic(getIngestItemDestinationName() + "?consumer.prefetchSize=1");

或致电

consumerFactory.getPrefetchPolicy().setTopicPrefetch(1);

尝试使用消息监听器

    MessageListener listner = new MessageListener() {
        public void onMessage(Message message) {
            try {
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println("Received message"
                            + textMessage.getText() + "'");
                }
            } catch (JMSException e) {
                System.out.println("Caught:" + e);
                e.printStackTrace();
            }
        }
    };

    consumer.setMessageListener(listner);

【讨论】:

  • 我尝试了第一个建议。同样的结果:recieve() 永远不会返回。
  • 第二个建议不可能。我的 consumerFactory 没有 getPrefetchPolicy 方法。
  • 您的云版本是 4.x 吗?
  • 云机器上的ActiveMQ版本5.14.1。本地主机上的 v5.14.0。
  • 对不起,如果问题如此简单,因为您遇到的问题很奇怪,但要验证您是否启动了消费者,并且只有在他的接收方法被阻止时,您才会向同一主题发送消息同一个主机。如果您按此顺序执行此操作,我不知道嫌疑人是否是destinationPolicy 和vmCursor,因为我从未使用过它,而且我看不到任何其他可能导致您的问题的东西。
猜你喜欢
  • 2015-02-05
  • 1970-01-01
  • 2015-12-01
  • 2011-01-20
  • 1970-01-01
  • 2022-12-03
  • 2017-01-25
  • 2011-05-03
  • 2018-10-10
相关资源
最近更新 更多