【发布时间】:2021-10-13 12:46:01
【问题描述】:
我已经基于 Spring 的集成包中的 SftpRemoteFileTemplate 实现了从 SFTP 服务器获取文件、将文件放入和删除文件的服务。
这里 sftpGetPayload 从 SFTP 服务器获取文件并传递其内容。
这是我目前的代码:
public String sftpGetPayload(String sessionId,
String host, int port, String user, String password,
String remoteDir, String remoteFilename, boolean remoteRemove) {
LOG.info("sftpGetPayload sessionId={}", sessionId);
LOG.debug("sftpGetPayLoad host={}, port={}, user={}", host, port, user);
LOG.debug("sftpGetPayload remoteDir={}, remoteFilename={}, remoteRemove={}",
remoteDir, remoteFilename, remoteRemove);
final AtomicReference<String> refPayload = new AtomicReference<>();
SftpRemoteFileTemplate template = getSftpRemoteFileTemplate(host, port,
user, password, remoteDir, remoteFilename);
template.get(remoteDir + "/" + remoteFilename,
is -> refPayload.set(getAsString(is)));
LOG.info("sftpGetToFile {} read.", remoteDir + "/" + remoteFilename);
deleteRemoteFile(template, remoteDir, remoteFilename, remoteRemove);
return refPayload.get();
}
private SftpRemoteFileTemplate getSftpRemoteFileTemplate(String host, int port,
String user, String password, String remoteDir, String remoteFilename) {
SftpRemoteFileTemplate template =
new SftpRemoteFileTemplate(sftpSessionFactory(host, port, user, password));
template.setFileNameExpression(
new LiteralExpression(remoteDir + "/" + remoteFilename));
template.setRemoteDirectoryExpression(new LiteralExpression(remoteDir));
//template.afterPropertiesSet();
return template;
}
private void deleteRemoteFile(SftpRemoteFileTemplate template,
String remoteDir, String remoteFilename, boolean remoteRemove) {
LOG.debug("deleteRemoteFile remoteRemove={}", remoteRemove);
if (remoteRemove) {
template.remove(remoteDir + "/" + remoteFilename);
LOG.info("sftpGetToFile {} removed.", remoteDir + "/" + remoteFilename);
}
}
所有这些 GET 操作都是活动操作,这意味着要获取的文件被认为已经存在。我想要一种轮询过程,它会在 SFTP 服务器上收到文件后立即调用我的有效负载消费方法。
我找到了另一个基于 Spring bean 的实现,配置为 Spring Integration Dsl,它声明了一个 SftpSessionFactory、一个 SftpInboundFileSynchronizer、一个 SftpMessageSource ,以及一个 MessageHandler,它轮询 SFTP 站点以接收文件并自动启动消息处理程序以进行进一步处理。
这段代码如下:
@Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(myHost);
factory.setPort(myPort);
factory.setUser(myUser);
factory.setPassword(myPassword);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory(myRemotePath);
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter(myFileFilter));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
sftpInboundFileSynchronizer());
source.setLocalDirectory(myLocalDirectory);
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
如何将这个@Poller/MessageHandler/@ServiceActivator 概念包含到我上面的实现中?或者有没有办法在基于模板的实现中实现这个功能?
场景可能如下:
我有一个 Spring Boot 应用程序,其中包含多个代表任务的类。其中一些任务是通过 Spring @Scheduled 注解和 CRON 规范自动调用的,而另一些则不是。
@Scheduled(cron = "${task.to.start.automatically.frequency}")
public void runAsTask() {
...
}
第一个任务将从 ist @Sheduled 规范开始,从 SFTP 服务器获取文件并处理它。它将使用自己的通道(host1、port1、user1、password1、remoteDir1、remoteFile1)来执行此操作。 第二个任务也将由调度程序运行并生成一些内容以放入 SFTP 服务器。它将使用自己的通道(host2、port2、user2、password2、remoteDir2、remoteFile2)来执行此操作。 host2 = host1 和 port2 = port1 很可能,但这不是必须的。 第三个任务也将由调度程序运行并生成一些内容以放入 SFTP 服务器。它将使用与 task1 相同的通道来执行此操作,但此任务是生产者(而不是像 task1 那样的消费者),并且将写入不同于 task1 的另一个文件(host1、port1、user1、password1、remoteDir3、remoteFile3)。 任务四没有@Scheduled 注释,因为它应该意识到何时从第三方接收到它必须处理的文件,因此可以在其通道(host4、port4、user4、password4、remoteDir4、remoteFile4)上获取其内容以进行处理它。
我已经阅读了整个集成的东西,但是对于这个用例来说,无论是从 XML 配置方案到带有注释的 Java 配置,还是从静态 Spring bean 方法到运行时的动态方法,都很难转换。
我知道使用 IntegrationFlow 来注册人工制品、task1 的入站适配器、task2 的出站适配器、task3 的入站适配器以及 task1 的相同(在其他任何地方注册的)会话工厂,以及 - 最后但并非最不重要的- 具有任务 4 轮询器功能的入站适配器。 或者它们都应该是具有命令功能的网关?还是我应该注册 SftpRemoteFileTemplate?
定义我拥有的频道:
public class TransferChannel {
private String host;
private int port;
private String user;
private String password;
/* getters, setters, hash, equals, and toString */
}
要将所有 SFTP 设置放在一起,我有:
public class TransferContext {
private boolean enabled;
private TransferChannel channel;
private String remoteDir;
private String remoteFilename;
private boolean remoteRemove;
private String remoteFilenameFilter;
private String localDir;
/* getters, setters, hash, equals, and toString */
}
作为 SFTP 处理的核心,每个作业都会注入一种 DynamicSftpAdapter:
@Scheduled(cron = "${task.to.start.automatically.frequency}")
public void runAsTask() {
@Autowired
DynamicSftpAdapter sftp;
...
sftp.connect("Task1", context);
File f = sftp.getFile("Task1", "remoteDir", "remoteFile");
/* process file content */
sftp.removeFile("Task1", "remoteDir", "remoteFile");
sftp.disconnect("Task1", context);
}
DynamicSftpAdapter 只不过是一个片段:
@Component
public class DynamicSftpAdapter {
private static final Logger LOG = LoggerFactory.getLogger(DynamicTcpServer.class);
@Autowired
private IntegrationFlowContext flowContext;
@Autowired
private ApplicationContext appContext;
private final Map<TransferChannel, IntegrationFlowRegistration> registrations = new HashMap<>();
private final Map<String, TransferContext> sessions = new ConcurrentHashMap<>();
@Override
public void connect(String sessionId, TransferContext context) {
if (this.registrations.containsKey(context.getChannel())) {
LOG.debug("connect, channel exists for {}", sessionId);
}
else {
// register the required SFTP Outbound Adapter
TransferChannel channel = context.getChannel();
IntegrationFlow flow = f -> f.handle(Sftp.outboundAdapter(cashedSftpSessionFactory(
channel.getHost(), channel.getPort(),
channel.getUser(), channel.getPassword())));
this.registrations.put(channel, flowContext.registration(flow).register());
this.sessions.put(sessionId, context);
LOG.info("sftp session {} for {} started", sessionId, context);
}
}
private DefaultSftpSessionFactory sftpSessionFactory(String host, int port, String user, String password) {
LOG.debug("sftpSessionFactory");
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return factory;
}
private CachingSessionFactory<LsEntry> cashedSftpSessionFactory(String host, int port, String user, String password) {
LOG.debug("cashedSftpSessionFactory");
CachingSessionFactory<LsEntry> cashedSessionFactory =
new CachingSessionFactory<LsEntry>(
sftpSessionFactory(host, port, user, password));
return cashedSessionFactory;
}
@Override
public void sftpGetFile(String sessionId, String remoteDir, String remoteFilename) {
TransferContext context = sessions.get(sessionId);
if (context == null)
throw new IllegalStateException("Session not established, sessionId " + sessionId);
IntegrationFlowRegistration register = registrations.get(context.getChannel());
if (register != null) {
try {
LOG.debug("sftpGetFile get file {}", remoteDir + "/" + remoteFilename);
register.getMessagingTemplate().send(
MessageBuilder.withPayload(msg)
.setHeader(...).build());
}
catch (Exception e) {
appContext.getBean(context, DefaultSftpSessionFactory.class)
.close();
}
}
}
@Override
public void disconnect(String sessionId, TransferContext context) {
IntegrationFlowRegistration registration = this.registrations.remove(context.getChannel());
if (registration != null) {
registration.destroy();
}
LOG.info("sftp session for {} finished", context);
}
}
我不知道如何启动 SFTP 命令。当使用 OutboundGateway 并且必须立即指定 SFTP 命令(如 GET)时,我也没有得到,然后整个 SFTP 处理将在一种方法中,指定出站网关工厂并使用 get() 获取实例并可能调用以任何方式发送消息 .get()。
显然我需要帮助。
【问题讨论】:
标签: java spring-integration sftp spring-integration-dsl