【问题标题】:Spring Integration DSL Custom Error Channel Issue with ExecutorSpring Integration DSL 自定义错误通道问题与 Executor
【发布时间】:2020-01-19 16:59:16
【问题描述】:

您好,我有一个文件侦听器,它一次读取并行/多个文件

package com.example.demo.flow;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.*;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.stereotype.Component;

import java.io.File;
import java.util.concurrent.Executors;

/**
 * Created by muhdk on 03/01/2020.
 */
@Component
@Slf4j
public class TestFlow {

    @Bean
    public StandardIntegrationFlow errorChannelHandler() {

        return IntegrationFlows.from("testChannel")
                .handle(o -> {

                    log.info("Handling error....{}", o);
                }).get();
    }

    @Bean
    public IntegrationFlow testFile() {


        IntegrationFlowBuilder testChannel = IntegrationFlows.from(Files.inboundAdapter(new File("d:/input-files/")),
                e -> e.poller(Pollers.fixedDelay(5000L).maxMessagesPerPoll(5)
                        .errorChannel("testChannel")))
                .channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
                .transform(o -> {

                    throw new RuntimeException("Failing on purpose");

                }).handle(o -> {
                });

        return testChannel.get();


    }


}

它不会进入我的自定义错误通道

但如果我删除该行

            .channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))

然后进入错误通道。

我怎样才能让它工作,以便它通过执行程序进入我的自定义错误通道。

【问题讨论】:

    标签: java spring-boot spring-integration spring-integration-dsl spring-integration-file


    【解决方案1】:

    看起来当使用带有多条消息的 Executor 服务时,它不适用于正常的 errorChannel,我不知道为什么

    我做了这样的改变

    @Bean
    public IntegrationFlow testFile() {
    
    
        IntegrationFlowBuilder testChannel = IntegrationFlows.from(Files.inboundAdapter(new File("d:/input-files/")),
                e -> e.poller(Pollers.fixedDelay(5000L).maxMessagesPerPoll(10)
                ))
                .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "testChannel"))
                .channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
    
                .transform(o -> {
    
                    throw new RuntimeException("Failing on purpose");
    
                }).handle(o -> {
                });
    
        return testChannel.get();
    
    
    }
    

    这里的线

            .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "testChannel"))
    

    其余部分保持不变并且可以正常工作。

    【讨论】:

    • 您找到解决问题的方法是正确的。由于您将工作转移到完全不同的线程,因此无法通过启动轮询器中的try..catch 在该线程中捕获异常。该轮询线程没有任何错误!因此,在标头中添加错误通道是可行的方法。我们不能在框架中自己做那件事,因为我们不能假设它是你一直想做的。因此,您必须自己做出决定并应用解决方案。在文档中查看更多信息:docs.spring.io/spring-integration/reference/html/…
    猜你喜欢
    • 1970-01-01
    • 2015-01-29
    • 2014-03-03
    • 2019-06-29
    • 2016-12-21
    • 1970-01-01
    • 2018-03-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多