【问题标题】:What is proper way to advise failed transform w/ Spring Integration Java DSL建议使用 Spring Integration Java DSL 进行失败转换的正确方法是什么
【发布时间】:2015-12-03 05:19:50
【问题描述】:

我已经完成了一条“快乐之路”(如下)。

我如何建议.transform 调用让它调用错误流(通过errorChannel)而不中断mainFlow

目前mainFlow 在第二个.transform 发生第一次失败时终止(当负载无法反序列化为类型时)。我想要的行为是我想记录并继续处理。

我读过ExpressionEvaluatingRequestHandlerAdvice。我是否可以在每个.transform 调用中添加第二个参数,例如e -> e.advice(myAdviceBean),并使用successerror 通道声明这样的bean?假设我需要拆分 mainFlow 才能从每次转换中获得成功。

在一些注释方向上,我更新了原始代码示例。但是我仍然很难接受这个“一路回家”。

2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG handler 'ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org. .springframework.integration.handler.ServiceActivatingHandler#0)' 没有对请求消息产生回复:ErrorMessage [payload=org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException:处理程序失败;嵌套异常是 org.springframework.integration.transformer.MessageTransformationException: 转换消息失败;嵌套异常是 com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:无法识别的字段“hasDoaCostPriceChanged”(com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog 类),未标记为可忽略(18 个已知属性:“supplierUpdateDate”、“fPLOSMaskArrival "、"createDate"、"endAllowed"、"sellStateId"、"ratePlanLevel"、"ratePlanId"、"startAllowed"、"stayDate"、"doaCostPriceChanged"、"hotelId"、"logActionTypeId" [截断]]) 在 [来源:java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (通过参考链: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660} ] 2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel 调试 postSend (sent=true) on channel 'errorChannel',消息:ErrorMessage [payload=org.springframework .integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException:处理程序失败;嵌套异常是 org.springframework.integration.transformer.MessageTransformationException: 转换消息失败;嵌套异常是 com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:无法识别的字段“hasDoaCostPriceChanged”(com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog 类),未标记为可忽略(18 个已知属性:“supplierUpdateDate”、“fPLOSMaskArrival "、"createDate"、"endAllowed"、"sellStateId"、"ratePlanLevel"、"ratePlanId"、"startAllowed"、"stayDate"、"doaCostPriceChanged"、"hotelId"、"logActionTypeId" [截断]]) 在 [来源:java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (通过参考链: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660} ] 2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG 在通道“mainFlow.channel#3”上预发送,消息:GenericMessage [payload=java.util. zip.GZIPInputStream@242017b8,标头={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792,时间戳=1441738159664}] 2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.aggregator.AggregatingMessageHandler 调试 org.springframework.integration.aggregator.AggregatingMessageHandler#0 收到消息:GenericMessage [payload=java.util .zip.GZIPInputStream@242017b8,标头={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792,时间戳=1441738159664}] 2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG 在通道'errorChannel'上预发送,消息:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException:消息处理程序 [org.springframework.integration.aggregator.AggregatingMessageHandler#0] 中发生错误;嵌套异常是 java.lang.IllegalStateException:不允许空关联。也许 CorrelationStrategy 失败了?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}] 2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org.springframework .integration.handler.ServiceActivatingHandler#0) 收到消息:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException:消息处理程序中发生错误 [org.springframework.integration.aggregator.AggregatingMessageHandler#0];嵌套异常是 java.lang.IllegalStateException:不允许空关联。也许 CorrelationStrategy 失败了?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}] 2015-09-08 11:49:19,665 [pool-3-thread-1] com.xxx.DataMigrationModule$ErrorService 错误 org.springframework.messaging.MessageHandlingException:消息处理程序中发生错误 [org.springframework.integration.aggregator.AggregatingMessageHandler #0];嵌套异常是 java.lang.IllegalStateException:不允许空关联。也许 CorrelationStrategy 失败了? 在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84) 在 org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 在 org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101) 在 org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97) 在 org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287) 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 在 org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95) 在 org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) 在 org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) 在 org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) 在 org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) 在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) 在 org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 在 org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101) 在 org.springframework.integration.dispatcher.UnicastingDispatcher.access$000(UnicastingDispatcher.java:48) 在 org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:92) 在 org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745) 原因:java.lang.IllegalStateException:不允许空关联。也许 CorrelationStrategy 失败了? 在 org.springframework.util.Assert.state(Assert.java:385) 在 org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:369) 在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) ... 22 更多

更新(2015 年 9 月 8 日)

代码示例

@Bean
public IntegrationFlow mainFlow() {
    // @formatter:off
    return IntegrationFlows
            .from(
                    amazonS3InboundSynchronizationMessageSource(),
                    e -> e.poller(p -> p.trigger(this::nextExecutionTime))
            )
            .transform(unzipTransformer())
            .split(f -> new FileSplitter())
            .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
            .transform(Transformers.fromJson(persistentType()), , e -> e.advice(handlingAdvice()))
            // @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
            .aggregate(a -> 
                            a.releaseStrategy(g -> g.size() == persistenceBatchSize)
                            .expireGroupsUponCompletion(true)
                            .sendPartialResultOnExpiry(true)
                            .groupTimeoutExpression("size() ge 2 ? 10000 : -1")
                            , null
            )
            .handle(jdbcRepositoryHandler())
            // TODO add advised PollableChannel to deal with possible persistence issue and retry with partial batch
            .get();
    // @formatter:on
}

@Bean
public ErrorService errorService() {
    return new ErrorService();
}

@Bean
public MessageChannel customErrorChannel() {
    return MessageChannels.direct().get();
}

@Bean
public IntegrationFlow customErrorFlow() {
    // @formatter:off
    return IntegrationFlows
            .from(customErrorChannel())
            .handle("errorService", "handleError")
            .get();
    // @formatter:on
}

@Bean
ExpressionEvaluatingRequestHandlerAdvice handlingAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setOnFailureExpression("payload");
    advice.setFailureChannel(customErrorChannel());
    advice.setReturnFailureExpressionResult(true);
    advice.setTrapException(true);
    return advice;
}

protected class ErrorService implements ErrorHandler {

    private final Logger log = LoggerFactory.getLogger(getClass());

    @Override
    public void handleError(Throwable t) {
        stopEndpoints(t);
    }

    private void stopEndpoints(Throwable t) {
        log.error(ExceptionUtils.getStackTrace(t));
    }

}

【问题讨论】:

    标签: java spring transform spring-integration dsl


    【解决方案1】:

    是的,你是对的。对于advice Transformer 的MessageHandlerhandle() 方法,您应该使用.transform() EIP 方法的第二个参数的e.advice 方法。是的:您应该为您的目的定义ExpressionEvaluatingRequestHandlerAdvice bean。

    您可以将 Advice bean 重用于不同的目标,以相同的方式处理成功和失败。

    更新

    虽然我不清楚您希望如何继续处理错误消息,但您可以使用ExpressionEvaluatingRequestHandlerAdviceonFailureExpressionreturnFailureExpressionResult=trueunzipErrorChannel() 之后返回一些内容.

    顺便说一句,failureChannel 逻辑在没有onFailureExpression 的情况下不起作用:

    if (this.onFailureExpression != null) {
        Object evalResult = this.evaluateFailureExpression(message, actualException);
        if (this.returnFailureExpressionResult) {
            return evalResult;
        }
    }
    

    【讨论】:

    • 您好,Artem。我还在挣扎。我已经更新了流程并再次提供了跟踪;希望你有时间再看一眼?不明白我还需要做什么 w.r.t.只是记录和移动(在转换失败之后——理想情况下,我会收集字符串(来自InputStream)并将payload 写入重命名的文件,例如<filename>.failed.json)。我确实看到错误已记录但在ErrorService 下游失败Caused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
    【解决方案2】:

    原来我在几个地方出了问题,比如:

    • 我必须自动连接一个 Jackson2 ObjectMapper(我从 Sprint Boot 自动配置中获得)并构造一个 JsonObjectMapper 的实例作为第二个参数添加到 Transformers.fromJson 中;用于更宽松地解组为持久类型(停止 UnrecognizedPropertyException);从而放弃了对ExpressionEvaluatingRequestHandlerAdvice

    • 的需求
    • IntegrationFlowDefinition 中选择.split 方法的正确变体以使用FileSplitter,否则你不会得到这个拆分器,而是DefaultMessageSplitter,它会在第一次记录后提前终止流读自InputStream

    • 已将 transformaggregatehandle 移至使用异步任务执行器的自己的发布订阅频道

    仍然没有 100% 满足我的需要,但还远远不够。

    看看我在下面得到了什么......

    @Configuration
    @EnableIntegration
    @IntegrationComponentScan
    public class DataMigrationModule {
    
    private final Logger log = LoggerFactory.getLogger(getClass());
    
    @Value("${cloud.aws.credentials.accessKey}")
    private String accessKey;
    
    @Value("${cloud.aws.credentials.secretKey}")
    private String secretKey;
    
    @Value("${cloud.aws.s3.bucket}")
    private String bucket;
    
    @Value("${cloud.aws.s3.max-objects-per-batch:1024}")
    private int maxObjectsPerBatch;
    
    @Value("${cloud.aws.s3.accept-subfolders:false}")
    private String acceptSubFolders;
    
    @Value("${cloud.aws.s3.remote-directory}")
    private String remoteDirectory;
    
    @Value("${cloud.aws.s3.local-directory-ref:java.io.tmpdir}")
    private String localDirectoryRef;
    
    @Value("${cloud.aws.s3.local-subdirectory:target/s3-dump}")
    private String localSubdirectory;
    
    @Value("${cloud.aws.s3.filename-wildcard:}")
    private String fileNameWildcard;
    
    @Value("${app.persistent-type:}")
    private String persistentType;
    
    @Value("${app.repository-type:}")
    private String repositoryType;
    
    @Value("${app.persistence-batch-size:2500}")
    private int persistenceBatchSize;
    
    @Value("${app.persistence-batch-release-timeout-in-milliseconds:5000}")
    private int persistenceBatchReleaseTimeoutMillis;
    
    @Autowired
    private ListableBeanFactory beanFactory;
    
    @Autowired
    private ObjectMapper objectMapper;
    
    private final AtomicBoolean invoked = new AtomicBoolean();
    
    private Class<?> repositoryType() {
        try {
            return Class.forName(repositoryType);
        } catch (ClassNotFoundException cnfe) {
            log.error("Unknown repository implementation!", cnfe);
            System.exit(0);
        }
        return null;
    }
    
    private Class<?> persistentType() {
        try {
            return Class.forName(persistentType);
        } catch (ClassNotFoundException cnfe) {
            log.error("Unsupported type!", cnfe);
            System.exit(0);
        }
        return null;
    }
    
    public Date nextExecutionTime(TriggerContext triggerContext) {
        return this.invoked.getAndSet(true) ? null : new Date();
    }
    
    @Bean
    public FileToInputStreamTransformer unzipTransformer() {
        FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
        transformer.setDeleteFiles(true);
        return transformer;
    }
    
    @Bean
    public MessageSource<?> amazonS3InboundSynchronizationMessageSource() {
        AWSCredentials credentials = new BasicAWSCredentials(this.accessKey, this.secretKey);
        AmazonS3InboundSynchronizationMessageSource messageSource = new AmazonS3InboundSynchronizationMessageSource();
        messageSource.setCredentials(credentials);
        messageSource.setBucket(bucket);
        messageSource.setMaxObjectsPerBatch(maxObjectsPerBatch);
        messageSource.setAcceptSubFolders(Boolean.valueOf(acceptSubFolders));
        messageSource.setRemoteDirectory(remoteDirectory);
        if (!fileNameWildcard.isEmpty()) {
            messageSource.setFileNameWildcard(fileNameWildcard);
        }
        String directory = System.getProperty(localDirectoryRef);
        if (!localSubdirectory.startsWith("/")) {
            localSubdirectory = "/" + localSubdirectory;
        }
        if (!localSubdirectory.endsWith("/")) {
            localSubdirectory = localSubdirectory + "/";
        }
        directory = directory + localSubdirectory;
        FileUtils.mkdir(directory);
        messageSource.setDirectory(new LiteralExpression(directory));
        return messageSource;
    }
    
    @Bean
    public IntegrationFlow mainFlow() {
        // @formatter:off
        return IntegrationFlows
                .from(
                        amazonS3InboundSynchronizationMessageSource(),
                        e -> e.poller(p -> p.trigger(this::nextExecutionTime))
                )
                .transform(unzipTransformer())
                .split(new FileSplitter(), null)
                .publishSubscribeChannel(new SimpleAsyncTaskExecutor(), p -> p.subscribe(persistenceSubFlow()))
                .get();
        // @formatter:on
    }
    
    @Bean
    public IntegrationFlow persistenceSubFlow() {
        JsonObjectMapper<?, ?> jsonObjectMapper = new Jackson2JsonObjectMapper(objectMapper);
        ReleaseStrategy releaseStrategy = new TimeoutCountSequenceSizeReleaseStrategy(persistenceBatchSize,
                persistenceBatchReleaseTimeoutMillis);
        // @formatter:off
        return f -> f
                .transform(Transformers.fromJson(persistentType(), jsonObjectMapper))
                // @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
                .aggregate(
                        a -> a
                            .releaseStrategy(releaseStrategy)
                            .correlationStrategy(m -> m.getHeaders().get("id"))
                            .expireGroupsUponCompletion(true)
                            .sendPartialResultOnExpiry(true)
                            , null
                )
                .handle(jdbcRepositoryHandler());
        // @formatter:on
    }
    
    @Bean
    public JdbcRepositoryHandler jdbcRepositoryHandler() {
        return new JdbcRepositoryHandler(repositoryType(), beanFactory);
    }
    
    protected class JdbcRepositoryHandler extends AbstractMessageHandler {
    
        @SuppressWarnings("rawtypes")
        private Insertable repository;
    
        public JdbcRepositoryHandler(Class<?> repositoryClass, ListableBeanFactory beanFactory) {
            repository = (Insertable<?>) beanFactory.getBean(repositoryClass);
        }
    
        @Override
        protected void handleMessageInternal(Message<?> message) {
            repository.insert((List<?>) message.getPayload());
        }
    
    }
    
    protected class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {
    
        @Override
        protected InputStream transformFile(File payload) throws Exception {
            return new GZIPInputStream(new FileInputStream(payload));
        }
    }
    
    }
    

    【讨论】:

      猜你喜欢
      • 2016-03-06
      • 2015-01-29
      • 2017-11-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多