【发布时间】:2019-04-30 11:42:12
【问题描述】:
我已经用任务执行器设置了文件轮询器
ExecutorService executorService = Executors.newFixedThreadPool(10);
LOG.info("Setting up the poller for directory {} ", finalDirectory);
StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(new CustomFileReadingSource(finalDirectory),
c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS, 5)
.taskExecutor(executorService)
.maxMessagesPerPoll(10)
.advice(new LoggerSourceAdvisor(finalDirectory))
))
//move file to processing first processing
.transform(new FileMoveTransformer("C:/processing", true))
.channel("fileRouter")
.get();
如所见,我已将固定 threadpool 设置为 10 条,每次轮询最多发送 10 条消息。如果我放置 10 个文件,它仍然会一个一个地处理。这里有什么问题?
* 更新 *
虽然我现在有其他问题,但在 Gary 的回答之后它工作得非常好。
我已经像这样设置了我的轮询器
setDirectory(new File(path));
DefaultDirectoryScanner scanner = new DefaultDirectoryScanner();
scanner.setFilter(new AcceptAllFileListFilter<>());
setScanner(scanner);
使用AcceptAll 的原因是因为同一个文件可能会再次出现,这就是我先移动文件的原因。但是当我启用线程执行器时,同一个文件正在被多个线程处理,我假设是因为AcceptAllFile
如果我更改为AcceptOnceFileListFilter 它可以工作,但是再次出现的相同文件将不会再次被拾取!有什么办法可以避免这个问题?
问题/错误
在课堂AbstractPersistentAcceptOnceFileListFilter 我们有这个代码
@Override
public boolean accept(F file) {
String key = buildKey(file);
synchronized (this.monitor) {
String newValue = value(file);
String oldValue = this.store.putIfAbsent(key, newValue);
if (oldValue == null) { // not in store
flushIfNeeded();
return true;
}
// same value in store
if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) {
flushIfNeeded();
return true;
}
return false;
}
}
例如,如果我设置了每个轮询 5 的最大值并且有两个文件,那么它可能的同一个文件将被两个线程拾取。
假设我的代码在我阅读文件后会移动文件。
但是另一个线程获取到accept 方法
如果文件不存在,那么它会返回 lastModified 时间为 0 并且它会返回 true。
这会导致问题,因为该文件不存在。
如果它为 0,那么它应该返回 false,因为该文件不再存在。
【问题讨论】:
标签: java spring-integration spring-integration-dsl