【问题标题】:Handle large messages with Apache Camel and AMQ Artemis使用 Apache Camel 和 AMQ Artemis 处理大型消息
【发布时间】:2019-10-21 06:23:03
【问题描述】:

当我在 AMQ Artemis 队列中收到一条大消息 (100KiB+) 并尝试将此消息路由到另一个 AMQ 并且此消息具有属性 _AMQ_LARGE_SIZE 时,我收到以下错误:

14:38:56.250 [Camel (CamelTestRoute) thread #1 - JmsConsumer[QUEUE.TEST]] WARN  o.a.c.c.jms.EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - javax.jms.JMSRuntimeException: Invalid address QUEUE.TEST]
org.apache.camel.RuntimeCamelException: javax.jms.JMSRuntimeException: Invalid address QUEUE.TEST

我知道如果我在连接工厂中设置属性minLargeMessageSize 将消息发布到 AMQ 中,则不会发生此问题。

问题是,我无法控制创建连接工厂的代码,而且有时它们没有设置大消息大小属性。

有没有办法可以在 Camel 中使用我的 Connection Factory 来处理这个问题?

*编辑

16:33:03.836 [Camel (CamelTestRoute) thread #1 - JmsConsumer[QUEUE.TEST]] WARN  o.a.c.c.jms.EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - javax.jms.JMSRuntimeException: Invalid address QUEUE.TEST]
org.apache.camel.RuntimeCamelException: javax.jms.JMSRuntimeException: Invalid address QUEUE.TEST
        at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1830)
        at org.apache.camel.component.jms.EndpointMessageListener$EndpointMessageListenerAsyncCallback.done(EndpointMessageListener.java:196)
        at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:117)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1168)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1160)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1057)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: javax.jms.JMSRuntimeException: Invalid address QUEUE.TEST
        at org.apache.activemq.artemis.jms.client.ActiveMQDestination.fromAddress(ActiveMQDestination.java:119)
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.getJMSDestination(ActiveMQMessage.java:386)
        at org.apache.camel.component.jms.JmsBinding.extractHeadersFromJms(JmsBinding.java:187)
        at org.apache.camel.component.jms.JmsMessage.populateInitialHeaders(JmsMessage.java:229)
        at org.apache.camel.impl.DefaultMessage.createHeaders(DefaultMessage.java:257)
        at org.apache.camel.component.jms.JmsMessage.ensureInitialHeaders(JmsMessage.java:214)
        at org.apache.camel.component.jms.JmsMessage.getHeader(JmsMessage.java:164)
        at org.apache.camel.impl.DefaultMessage.getHeader(DefaultMessage.java:93)
        at org.apache.camel.impl.DefaultUnitOfWork.<init>(DefaultUnitOfWork.java:115)
        at org.apache.camel.impl.MDCUnitOfWork.<init>(MDCUnitOfWork.java:54)
        at org.apache.camel.impl.DefaultUnitOfWorkFactory.createUnitOfWork(DefaultUnitOfWorkFactory.java:32)
        at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.createUnitOfWork(CamelInternalProcessor.java:695)
        at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:663)
        at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:634)
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:149)
        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
        at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:113)
        ... 11 common frames omitted

【问题讨论】:

  • javax.jms.JMSRuntimeException: Invalid address QUEUE.TEST 的来源是哪里?是否有完整的堆栈跟踪?我在 ActiveMQ Artemis 代码库中的任何地方都看不到那个特定的措辞,所以我猜它来自 Camel 本身。您尝试将消息路由到的其他代理是什么?也是 ActiveMQ Artemis 吗?
  • 嗨 Betram,是的,我有,我会更新问题。我认为错误也来自骆驼。是的,第二个代理是 ActiveMQ Artemis,只是托管在另一个网络中。
  • 您使用的是什么版本的 ActiveMQ Artemis?
  • AMQ Artemis 版本:2.6.2 Apache Camel 版本:2.20.0
  • 您确定您使用的是 ActiveMQ Artemis 2.6.2 吗?查看 ActiveMQDestination 的源代码,line 119(引发异常的位置)与方法 fromAddress() 不对应。这表明您实际上使用的是不同版本的 Artemis。

标签: apache-camel activemq-artemis


【解决方案1】:

如果您针对 Artemis 2.x 代理使用 Artemis 1.x 客户端,则需要使用适当的 anycastPrefixmulticastPrefix 配置客户端连接的接受器,例如:

<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>

【讨论】:

  • 我用 2.6.2 测试过,添加了任播和多播前缀。有同样的行为。在接受器中添加connectionTtl 并在broker.xml 中添加选项journal-buffer-size 后问题就解决了。
猜你喜欢
  • 1970-01-01
  • 2019-04-02
  • 2020-04-26
  • 1970-01-01
  • 1970-01-01
  • 2013-12-23
  • 1970-01-01
  • 2016-01-16
  • 1970-01-01
相关资源
最近更新 更多