【问题标题】:TaskExecutor is not working Spring IntegrationTaskExecutor 不工作 Spring Integration
【发布时间】:2019-04-30 11:42:12
【问题描述】:

我已经用任务执行器设置了文件轮询器

ExecutorService executorService = Executors.newFixedThreadPool(10);

            LOG.info("Setting up the poller for directory {} ", finalDirectory);
            StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(new CustomFileReadingSource(finalDirectory),
                    c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS, 5)
                            .taskExecutor(executorService)
                            .maxMessagesPerPoll(10)
                            .advice(new LoggerSourceAdvisor(finalDirectory))
                    ))


                    //move file to processing first processing                    
                    .transform(new FileMoveTransformer("C:/processing", true))
                    .channel("fileRouter")
                    .get();

如所见,我已将固定 threadpool 设置为 10 条,每次轮询最多发送 10 条消息。如果我放置 10 个文件,它仍然会一个一个地处理。这里有什么问题?

* 更新 *

虽然我现在有其他问题,但在 Gary 的回答之后它工作得非常好。

我已经像这样设置了我的轮询器

setDirectory(new File(path));
        DefaultDirectoryScanner scanner = new DefaultDirectoryScanner();

        scanner.setFilter(new AcceptAllFileListFilter<>());
        setScanner(scanner);

使用AcceptAll 的原因是因为同一个文件可能会再次出现,这就是我先移动文件的原因。但是当我启用线程执行器时,同一个文件正在被多个线程处理,我假设是因为AcceptAllFile

如果我更改为AcceptOnceFileListFilter 它可以工作,但是再次出现的相同文件将不会再次被拾取!有什么办法可以避免这个问题?

问题/错误

在课堂AbstractPersistentAcceptOnceFileListFilter 我们有这个代码

@Override
    public boolean accept(F file) {
        String key = buildKey(file);
        synchronized (this.monitor) {
            String newValue = value(file);
            String oldValue = this.store.putIfAbsent(key, newValue);
            if (oldValue == null) { // not in store
                flushIfNeeded();
                return true;
            }
            // same value in store
            if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) {
                flushIfNeeded();
                return true;
            }
            return false;
        }
    }

例如,如果我设置了每个轮询 5 的最大值并且有两个文件,那么它可能的同一个文件将被两个线程拾取。

假设我的代码在我阅读文件后会移动文件。

但是另一个线程获取到accept 方法

如果文件不存在,那么它会返回 lastModified 时间为 0 并且它会返回 true。

这会导致问题,因为该文件不存在。

如果它为 0,那么它应该返回 false,因为该文件不再存在。

【问题讨论】:

    标签: java spring-integration spring-integration-dsl


    【解决方案1】:

    当您将任务执行器添加到轮询器时;所做的只是调度程序线程将轮询任务交给线程池中的一个线程; maxMessagesPerPoll 是投票任务的一部分。轮询器本身仅每 5 秒运行一次。为了得到你想要的,你应该在流中添加一个执行器通道......

    @SpringBootApplication
    public class So53521593Application {
    
        private static final Logger logger = LoggerFactory.getLogger(So53521593Application.class);
    
        public static void main(String[] args) {
            SpringApplication.run(So53521593Application.class, args);
        }
    
        @Bean
        public IntegrationFlow flow() {
            ExecutorService exec = Executors.newFixedThreadPool(10);
            return IntegrationFlows.from(() -> "foo", e -> e
                        .poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                                .maxMessagesPerPoll(10)))
                    .channel(MessageChannels.executor(exec))
                    .<String>handle((p, h) -> {
                        try {
                            logger.info(p);
                            Thread.sleep(10_000);
                        }
                        catch (InterruptedException e1) {
                            Thread.currentThread().interrupt();
                        }
                        return null;
                    })
                    .get();
        }
    }
    

    编辑

    它对我来说很好......

    @Bean
    public IntegrationFlow flow() {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
                    new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
                        e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                                .maxMessagesPerPoll(10)))
                .channel(MessageChannels.executor(exec))
                .handle((p, h) -> {
                    try {
                        logger.info(p.toString());
                        Thread.sleep(10_000);
                    }
                    catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                    }
                    return null;
                })
                .get();
    }
    

    2018-11-28 11:46:05.196 INFO 57607 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt

    2018-11-28 11:46:05.197 INFO 57607 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt

    touch test1.txt

    2018-11-28 11:48:00.284 INFO 57607 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt

    EDIT1

    同意 - 转载此...

    @Bean
    public IntegrationFlow flow() {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
                    new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
                        e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                                .maxMessagesPerPoll(10)))
                .channel(MessageChannels.executor(exec))
                .<File>handle((p, h) -> {
                    try {
                        p.delete();
                        logger.info(p.toString());
                        Thread.sleep(10_000);
                    }
                    catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                    }
                    return null;
                })
                .get();
    }
    

    2018-11-28 13:22:23.689 INFO 75681 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt

    2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt

    2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt

    2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-4] com.example.So53521593Application : /tmp/foo/test2.txt

    【讨论】:

    • 但是我必须将要处理的文件发送到另一个频道。我如何从这里路由?
    • 对不起,我不明白你的意思;只需将.channel() 放在变压器之前,然后从轮询器中删除执行器。
    • 我确实尝试过,但我看到了意外的行为,它看起来不是线程安全的。
    • 你的代码?该框架是线程安全的。如果你想并行处理文件,你的代码需要是线程安全的。
    • 使用FileSystemPersistentAcceptOnceFileListFilter - 只有当lastModified 时间改变时,它才会允许相同的文件名通过。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-26
    • 2014-05-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多