【发布时间】:2020-08-19 14:52:24
【问题描述】:
我正在使用 spring 入站通道适配器从 sftp 服务器轮询文件。应用程序需要从单个 sftp 服务器的多个目录进行轮询。由于入站通道适配器不允许轮询多个目录,我尝试创建多个具有不同值的相同类型的 bean。由于将来目录的数量会增加,我想从应用程序属性中控制它,并希望动态注册 bean。
我的代码 -
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
beanFactory.registerSingleton("sftpSessionFactory", sftpSessionFactory(host, port, user, password));
beanFactory.registerSingleton("sftpInboundFileSynchronizer",
sftpInboundFileSynchronizer((SessionFactory) beanFactory.getBean("sftpSessionFactory")));
}
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory(String host, String port, String user, String password) {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(Integer.parseInt(port));
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
private SftpInboundFileSynchronizer sftpInboundFileSynchronizer(SessionFactory sessionFactory) {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sessionFactory);
fileSynchronizer.setDeleteRemoteFiles(true);
fileSynchronizer.setPreserveTimestamp(true);
fileSynchronizer.setRemoteDirectory("/mydir/subdir);
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.pdf"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "2000"))
public MessageSource<File> sftpMessageSource(String s) {
SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
(AbstractInboundFileSynchronizer<ChannelSftp.LsEntry>) applicationContext.getBean("sftpInboundFileSynchronizer"));
source.setLocalDirectory(new File("/dir/subdir"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<>());
source.setMaxFetchSize(Integer.parseInt(maxFetchSize));
source.setAutoCreateLocalDirectory(true);
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return message -> {
LOGGER.info("Payload - {}", message.getPayload());
};
}
此代码运行良好。但是如果我动态创建 sftpMessageSource,那么 @InboundChannelAdapter 注释将不起作用。请提出一种动态创建 sftpMessageSource 和 handler bean 的方法并添加相应的注释。
更新:
以下代码有效:
@PostConstruct
void init() {
int index = 0;
for (String directory : directories) {
index++;
int finalI = index;
IntegrationFlow flow = IntegrationFlows
.from(Sftp.inboundAdapter(sftpSessionFactory())
.preserveTimestamp(true)
.remoteDirectory(directory)
.autoCreateLocalDirectory(true)
.localDirectory(new File("/" + directory))
.localFilter(new AcceptOnceFileListFilter<>())
.maxFetchSize(10)
.filter(new SftpSimplePatternFileListFilter("*.pdf"))
.deleteRemoteFiles(true),
e -> e.id("sftpInboundAdapter" + finalI)
.autoStartup(true)
.poller(Pollers.fixedDelay(2000)))
.handle(handler())
.get();
this.flowContext.registration(flow).register();
}
}
@Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(Integer.parseInt(port));
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
【问题讨论】:
标签: spring spring-boot spring-integration sftp spring-integration-sftp