【发布时间】:2015-12-03 05:19:50
【问题描述】:
我已经完成了一条“快乐之路”(如下)。
我如何建议.transform 调用让它调用错误流(通过errorChannel)而不中断mainFlow?
目前mainFlow 在第二个.transform 发生第一次失败时终止(当负载无法反序列化为类型时)。我想要的行为是我想记录并继续处理。
我读过ExpressionEvaluatingRequestHandlerAdvice。我是否可以在每个.transform 调用中添加第二个参数,例如e -> e.advice(myAdviceBean),并使用success 和error 通道声明这样的bean?假设我需要拆分 mainFlow 才能从每次转换中获得成功。
在一些注释方向上,我更新了原始代码示例。但是我仍然很难接受这个“一路回家”。
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG handler 'ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org. .springframework.integration.handler.ServiceActivatingHandler#0)' 没有对请求消息产生回复:ErrorMessage [payload=org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException:处理程序失败;嵌套异常是 org.springframework.integration.transformer.MessageTransformationException: 转换消息失败;嵌套异常是 com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:无法识别的字段“hasDoaCostPriceChanged”(com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog 类),未标记为可忽略(18 个已知属性:“supplierUpdateDate”、“fPLOSMaskArrival "、"createDate"、"endAllowed"、"sellStateId"、"ratePlanLevel"、"ratePlanId"、"startAllowed"、"stayDate"、"doaCostPriceChanged"、"hotelId"、"logActionTypeId" [截断]]) 在 [来源:java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (通过参考链: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660} ] 2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel 调试 postSend (sent=true) on channel 'errorChannel',消息:ErrorMessage [payload=org.springframework .integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException:处理程序失败;嵌套异常是 org.springframework.integration.transformer.MessageTransformationException: 转换消息失败;嵌套异常是 com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:无法识别的字段“hasDoaCostPriceChanged”(com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog 类),未标记为可忽略(18 个已知属性:“supplierUpdateDate”、“fPLOSMaskArrival "、"createDate"、"endAllowed"、"sellStateId"、"ratePlanLevel"、"ratePlanId"、"startAllowed"、"stayDate"、"doaCostPriceChanged"、"hotelId"、"logActionTypeId" [截断]]) 在 [来源:java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (通过参考链: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660} ] 2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG 在通道“mainFlow.channel#3”上预发送,消息:GenericMessage [payload=java.util. zip.GZIPInputStream@242017b8,标头={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792,时间戳=1441738159664}] 2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.aggregator.AggregatingMessageHandler 调试 org.springframework.integration.aggregator.AggregatingMessageHandler#0 收到消息:GenericMessage [payload=java.util .zip.GZIPInputStream@242017b8,标头={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792,时间戳=1441738159664}] 2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG 在通道'errorChannel'上预发送,消息:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException:消息处理程序 [org.springframework.integration.aggregator.AggregatingMessageHandler#0] 中发生错误;嵌套异常是 java.lang.IllegalStateException:不允许空关联。也许 CorrelationStrategy 失败了?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}] 2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org.springframework .integration.handler.ServiceActivatingHandler#0) 收到消息:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException:消息处理程序中发生错误 [org.springframework.integration.aggregator.AggregatingMessageHandler#0];嵌套异常是 java.lang.IllegalStateException:不允许空关联。也许 CorrelationStrategy 失败了?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}] 2015-09-08 11:49:19,665 [pool-3-thread-1] com.xxx.DataMigrationModule$ErrorService 错误 org.springframework.messaging.MessageHandlingException:消息处理程序中发生错误 [org.springframework.integration.aggregator.AggregatingMessageHandler #0];嵌套异常是 java.lang.IllegalStateException:不允许空关联。也许 CorrelationStrategy 失败了? 在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84) 在 org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 在 org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101) 在 org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97) 在 org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287) 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 在 org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95) 在 org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) 在 org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) 在 org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) 在 org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) 在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) 在 org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 在 org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101) 在 org.springframework.integration.dispatcher.UnicastingDispatcher.access$000(UnicastingDispatcher.java:48) 在 org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:92) 在 org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745) 原因:java.lang.IllegalStateException:不允许空关联。也许 CorrelationStrategy 失败了? 在 org.springframework.util.Assert.state(Assert.java:385) 在 org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:369) 在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) ... 22 更多更新(2015 年 9 月 8 日)
代码示例
@Bean
public IntegrationFlow mainFlow() {
// @formatter:off
return IntegrationFlows
.from(
amazonS3InboundSynchronizationMessageSource(),
e -> e.poller(p -> p.trigger(this::nextExecutionTime))
)
.transform(unzipTransformer())
.split(f -> new FileSplitter())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.transform(Transformers.fromJson(persistentType()), , e -> e.advice(handlingAdvice()))
// @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
.aggregate(a ->
a.releaseStrategy(g -> g.size() == persistenceBatchSize)
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.groupTimeoutExpression("size() ge 2 ? 10000 : -1")
, null
)
.handle(jdbcRepositoryHandler())
// TODO add advised PollableChannel to deal with possible persistence issue and retry with partial batch
.get();
// @formatter:on
}
@Bean
public ErrorService errorService() {
return new ErrorService();
}
@Bean
public MessageChannel customErrorChannel() {
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow customErrorFlow() {
// @formatter:off
return IntegrationFlows
.from(customErrorChannel())
.handle("errorService", "handleError")
.get();
// @formatter:on
}
@Bean
ExpressionEvaluatingRequestHandlerAdvice handlingAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnFailureExpression("payload");
advice.setFailureChannel(customErrorChannel());
advice.setReturnFailureExpressionResult(true);
advice.setTrapException(true);
return advice;
}
protected class ErrorService implements ErrorHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public void handleError(Throwable t) {
stopEndpoints(t);
}
private void stopEndpoints(Throwable t) {
log.error(ExceptionUtils.getStackTrace(t));
}
}
【问题讨论】:
标签: java spring transform spring-integration dsl