【问题标题】:Inappropriate exception when apache camel interceptor throws oneapache camel拦截器抛出一个不适当的异常
【发布时间】:2020-08-24 18:26:08
【问题描述】:

我创建了一个拦截器来测试 Camel 路由中的异常处理程序。 这是一个拦截器:

context.getRouteDefinition("AnalisingResponse").adviceWith(context,
            new AdviceWithRouteBuilder() {
                @Override
                public void configure() {
                    interceptSendToEndpoint("direct:PsMessageLogger_updateMessage")
                        .routeId("interceptor")
                        .skipSendToOriginalEndpoint()
                        .throwException(exception);
                }
            });

我希望申请的路线adviceWith:

      from("direct:AnalisingResponse")
            .routeId("AnalisingResponse")
            .log("start PsASetStatementProcessor_AnalisingResponse")
            .process(exchange -> {
                exchange.setProperty(CONDITION,
                    bodyDataContainer(exchange)
                        .getValue("/*[1]/Body/container/Msg/BSMessage/BSError/@Cod")  // TODO local-name
                );
            })
            .process((BaseAction) exchange -> {
                DataContainer localBody = propertyDataContainer(exchange, LOCAL_BODY);

                Long serviceId = Long.parseLong(localBody.getValue("/Message/ServiceID"));
                Long taskId = Long.parseLong(localBody.getValue("/Message/TaskID"));
                Long statusId = 39L;
                Long mtType = 0L;

                String requestVar = bsDbSwiftCorpService.updateTask(taskId, serviceId, mtType, statusId);

                exchange.setProperty(REQUEST_VAR, requestVar);
            })
            .process((BaseAction) exchange -> {
                DataContainer localBody = propertyDataContainer(exchange, LOCAL_BODY);
                localBody.replace("./Message/MessageStatusID", "3");
                exchange.setProperty(LOCAL_BODY, localBody);

                template.sendBody(PsMessageLogger.DIRECT_ROUTE_UPDATE_MESSAGE, localBody.copy());
            });

这是一条路线,我想拦截:

        from("direct:PsMessageLogger_updateMessage")
                .routeId("PsMessageLogger_updateMessage")
                .log("start PsMessageLogger_updateMessage")
                .onException(Exception.class)
                    .handled(true)
                    .to("direct:PsMessageLogger_updateMessage_errorHandler")
                .end()
                .process((BaseAction) exchange -> {
                    DataContainer body = bodyDataContainer(exchange);

                    Long messageID = new Long(body.getValue("/Message/MessageID"));
                    Long messageStatusID = new Long(body.getValue("/Message/MessageStatusID"));
                    Long taskID = new Long(body.getValue("/Message/TaskID"));
                    Long taskStatusGenID = new Long(body.getValue("/Message/TaskStatusGenID"));

                    bsMessageLoggerToDbService.updateMessage(messageID, messageStatusID, taskID, taskStatusGenID);
                });

但是结果,EXCEPTION_CAUGHT 属性不包含我抛出的异常,而是 org.apache.camel.CamelExecutionException,其中原因是 NullPointerException。这是它的堆栈跟踪:

org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[ID-SERVO-1598117736905-0-3]
    at org.apache.camel.util.ObjectHelper.wrapCamelExecutionException(ObjectHelper.java:1842)
    at org.apache.camel.util.ExchangeHelper.extractResultBody(ExchangeHelper.java:715)
    at org.apache.camel.impl.DefaultProducerTemplate.extractResultBody(DefaultProducerTemplate.java:515)
    at org.apache.camel.impl.DefaultProducerTemplate.extractResultBody(DefaultProducerTemplate.java:511)
    at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:163)
    at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:168)
    at ru.vtb.swiftcorp.psASetStatementProcessor.proxy.services.PsASetStatementProcessor.lambda$configure$8(PsASetStatementProcessor.java:149)
    at ru.vtb.swiftcorp.common.BaseAction.process(BaseAction.java:30)
    at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:63)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
    at org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:117)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:76)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:76)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:76)
    at org.apache.camel.processor.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:186)
    at org.apache.camel.processor.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:86)
    at org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:529)
    at org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:494)
    at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:369)
    at org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:494)
    at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:229)
    at org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:144)
    at org.apache.camel.builder.DefaultFluentProducerTemplate.send(DefaultFluentProducerTemplate.java:306)
    at ru.vtb.swiftcorp.psASetStatementProcessor.proxy.services.PsASetStatementProcessorTest.testSuccessRoute_caseExceptionOnPsMessageLogger_updateMessage(PsASetStatementProcessorTest.java:546)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
    at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
    at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
Caused by: java.lang.NullPointerException
    at java.util.ArrayDeque.addFirst(ArrayDeque.java:233)
    at java.util.ArrayDeque.push(ArrayDeque.java:508)
    at org.apache.camel.processor.FatalFallbackErrorHandler.process(FatalFallbackErrorHandler.java:79)
    at org.apache.camel.processor.RedeliveryErrorHandler.deliverToFailureProcessor(RedeliveryErrorHandler.java:1063)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:474)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
    at org.apache.camel.impl.InterceptSendToEndpointProcessor.process(InterceptSendToEndpointProcessor.java:80)
    at org.apache.camel.processor.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:186)
    at org.apache.camel.processor.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:86)
    at org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:529)
    at org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:494)
    at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:369)
    at org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:494)
    at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:229)
    at org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:144)
    at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:161)
    ... 66 more

所有可以理解的错误发生在方法public boolean process(final Exchange exchange, final AsyncCallback callback)org.apache.camel.processor.FatalFallbackErrorHandler尝试将空值放入双端队列时。

如何在错误处理程序中获得我需要的异常?

【问题讨论】:

  • 您可以添加您要测试的路线吗?
  • @SneharghyaPathak 我添加了它
  • 您添加的所有路由都没有 AnalisingResponse 的 routeId,这就是您在拦截器中提到的。我也看不到任何发送到direct:PsMessageLogger_updateMessage 的路线
  • @SneharghyaPathak 我更新了路线,添加了routeId,但问题仍然存在

标签: java apache-camel interceptor spring-camel


【解决方案1】:

interceptSendToEndpoint("direct:PsMessageLogger_updateMessage") 在尝试拦截时查找实际字符串 direct:PsMessageLogger_updateMessage。在您的 AnalisingResponse 路由中,您正在处理器中动态执行此操作,这就是它无法拦截的原因。

AnalisingResponse 的第三个处理器中,删除生产者模板并将localBody 设置为您的消息正文,如下所示:exchange.getIn().setBody(localBody)。然后在处理方法之后,添加一个to("direct:PsMessageLogger_updateMessage")

您的AnalisingResponse 路由应如下所示:

from("direct:AnalisingResponse")
    .routeId("AnalisingResponse")
    .log("start PsASetStatementProcessor_AnalisingResponse")
    .process(exchange -> {
        //...
    })
    .process((BaseAction) exchange -> {
        //...
    })
    .process((BaseAction) exchange -> {
        DataContainer localBody = propertyDataContainer(exchange, LOCAL_BODY);
        localBody.replace("./Message/MessageStatusID", "3");
        exchange.setProperty(LOCAL_BODY, localBody);
        
        exchange.getIn().setBody(localBody);
    })
    .to("direct:PsMessageLogger_updateMessage");

【讨论】:

  • 谢谢,这是可行的。但这看起来有点奇怪,因为我的拦截器有效,当我通过ProducerTemplate.sendBody() 方法发送正文时,首先抛出我期望的异常,但后来在FatalFallbackErrorHandler 内部发生另一个异常。
  • producertemplate发送localBody成功。但是您的interceptSendToEndpoint 找不到direct:PsMessageLogger_updateMessage
  • 那我不明白为什么这样一个成功的测试用例成功通过了:messageLoggerMock.message(0).body().isEqualTo(localBody);。在这种情况下,消息记录器拦截器将交换重定向到 messageLoggerMock。
  • 有没有可以分享的仓库让我看看?
猜你喜欢
  • 1970-01-01
  • 2016-07-26
  • 2012-07-05
  • 2018-04-13
  • 2022-08-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多