【问题标题】:How to integrate a MessageHandler into the SFTP scenario based on SftpRemoteFileTemplate?如何基于 SftpRemoteFileTemplate 将 MessageHandler 集成到 SFTP 场景中?
【发布时间】:2021-10-13 12:46:01
【问题描述】:

我已经基于 Spring 的集成包中的 SftpRemoteFileTemplate 实现了从 SFTP 服务器获取文件、将文件放入和删除文件的服务。

这里 sftpGetPayload 从 SFTP 服务器获取文件并传递其内容。

这是我目前的代码:

public String sftpGetPayload(String sessionId,
        String host, int port, String user, String password,
        String remoteDir, String remoteFilename, boolean remoteRemove) {
    LOG.info("sftpGetPayload sessionId={}", sessionId);
    LOG.debug("sftpGetPayLoad host={}, port={}, user={}", host, port, user);
    LOG.debug("sftpGetPayload remoteDir={}, remoteFilename={}, remoteRemove={}",
            remoteDir, remoteFilename, remoteRemove);

    final AtomicReference<String> refPayload = new AtomicReference<>();

    SftpRemoteFileTemplate template = getSftpRemoteFileTemplate(host, port,
            user, password, remoteDir, remoteFilename);

    template.get(remoteDir + "/" + remoteFilename,
            is -> refPayload.set(getAsString(is)));
    LOG.info("sftpGetToFile {} read.", remoteDir + "/" + remoteFilename);

    deleteRemoteFile(template, remoteDir, remoteFilename, remoteRemove);

    return refPayload.get();
}

private SftpRemoteFileTemplate getSftpRemoteFileTemplate(String host, int port,
        String user, String password, String remoteDir, String remoteFilename) {
    SftpRemoteFileTemplate template =
            new SftpRemoteFileTemplate(sftpSessionFactory(host, port, user, password));
    template.setFileNameExpression(
            new LiteralExpression(remoteDir + "/" + remoteFilename));
    template.setRemoteDirectoryExpression(new LiteralExpression(remoteDir));
    //template.afterPropertiesSet();

    return template;
}

private void deleteRemoteFile(SftpRemoteFileTemplate template,
        String remoteDir, String remoteFilename, boolean remoteRemove) {
    LOG.debug("deleteRemoteFile remoteRemove={}", remoteRemove);
    if (remoteRemove) {
        template.remove(remoteDir + "/" + remoteFilename);
        LOG.info("sftpGetToFile {} removed.", remoteDir + "/" + remoteFilename);
    }
}

所有这些 GET 操作都是活动操作,这意味着要获取的文件被认为已经存在。我想要一种轮询过程,它会在 SFTP 服务器上收到文件后立即调用我的有效负载消费方法。

我找到了另一个基于 Spring bean 的实现,配置为 Spring Integration Dsl,它声明了一个 SftpSessionFactory、一个 SftpInboundFileSynchronizer、一个 SftpMessageSource ,以及一个 MessageHandler,它轮询 SFTP 站点以接收文件并自动启动消息处理程序以进行进一步处理。

这段代码如下:

    @Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    factory.setHost(myHost);
    factory.setPort(myPort);
    factory.setUser(myUser);
    factory.setPassword(myPassword);
    factory.setAllowUnknownKeys(true);
    return new CachingSessionFactory<LsEntry>(factory);
}

@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
    SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
    fileSynchronizer.setDeleteRemoteFiles(false);
    fileSynchronizer.setRemoteDirectory(myRemotePath);
    fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter(myFileFilter));
    return fileSynchronizer;
}

@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
    SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
            sftpInboundFileSynchronizer());
    source.setLocalDirectory(myLocalDirectory);
    source.setAutoCreateLocalDirectory(true);
    source.setLocalFilter(new AcceptOnceFileListFilter<File>());
    return source;
}

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
        }

    };
}

如何将这个@Poller/MessageHandler/@ServiceActivator 概念包含到我上面的实现中?或者有没有办法在基于模板的实现中实现这个功能?

场景可能如下:

我有一个 Spring Boot 应用程序,其中包含多个代表任务的类。其中一些任务是通过 Spring @Scheduled 注解和 CRON 规范自动调用的,而另一些则不是。

    @Scheduled(cron = "${task.to.start.automatically.frequency}")
    public void runAsTask() {
        ...
    }

第一个任务将从 ist @Sheduled 规范开始,从 SFTP 服务器获取文件并处理它。它将使用自己的通道(host1、port1、user1、password1、remoteDir1、remoteFile1)来执行此操作。 第二个任务也将由调度程序运行并生成一些内容以放入 SFTP 服务器。它将使用自己的通道(host2、port2、user2、password2、remoteDir2、remoteFile2)来执行此操作。 host2 = host1 和 port2 = port1 很可能,但这不是必须的。 第三个任务也将由调度程序运行并生成一些内容以放入 SFTP 服务器。它将使用与 task1 相同的通道来执行此操作,但此任务是生产者(而不是像 task1 那样的消费者),并且将写入不同于 task1 的另一个文件(host1、port1、user1、password1、remoteDir3、remoteFile3)。 任务四没有@Scheduled 注释,因为它应该意识到何时从第三方接收到它必须处理的文件,因此可以在其通道(host4、port4、user4、password4、remoteDir4、remoteFile4)上获取其内容以进行处理它。

我已经阅读了整个集成的东西,但是对于这个用例来说,无论是从 XML 配置方案到带有注释的 Java 配置,还是从静态 Spring bean 方法到运行时的动态方法,都很难转换。

我知道使用 IntegrationFlow 来注册人工制品、task1 的入站适配器、task2 的出站适配器、task3 的入站适配器以及 task1 的相同(在其他任何地方注册的)会话工厂,以及 - 最后但并非最不重要的- 具有任务 4 轮询器功能的入站适配器。 或者它们都应该是具有命令功能的网关?还是我应该注册 SftpRemoteFileTemplate?

定义我拥有的频道:

public class TransferChannel {

    private String host;
    private int port;
    private String user;
    private String password;

    /* getters, setters, hash, equals, and toString */

}

要将所有 SFTP 设置放在一起,我有:

public class TransferContext {

    private boolean enabled;
    private TransferChannel channel;
    private String remoteDir;
    private String remoteFilename;
    private boolean remoteRemove;
    private String remoteFilenameFilter;
    private String localDir;

    /* getters, setters, hash, equals, and toString */

}

作为 SFTP 处理的核心,每个作业都会注入一种 DynamicSftpAdapter:

    @Scheduled(cron = "${task.to.start.automatically.frequency}")
    public void runAsTask() {

        @Autowired
        DynamicSftpAdapter sftp;

        ...

        sftp.connect("Task1", context);
        File f = sftp.getFile("Task1", "remoteDir", "remoteFile");

        /* process file content */

        sftp.removeFile("Task1", "remoteDir", "remoteFile");
        sftp.disconnect("Task1", context);
    }

DynamicSftpAdapter 只不过是一个片段:

@Component
public class DynamicSftpAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicTcpServer.class);

    @Autowired
    private IntegrationFlowContext flowContext;

    @Autowired
    private ApplicationContext appContext;

    private final Map<TransferChannel, IntegrationFlowRegistration> registrations = new HashMap<>();

    private final Map<String, TransferContext> sessions = new ConcurrentHashMap<>();

    @Override
    public void connect(String sessionId, TransferContext context) {
        if (this.registrations.containsKey(context.getChannel())) {
            LOG.debug("connect, channel exists for {}", sessionId);         
        }
        else {
            // register the required SFTP Outbound Adapter
            TransferChannel channel = context.getChannel();
            IntegrationFlow flow = f -> f.handle(Sftp.outboundAdapter(cashedSftpSessionFactory(
                channel.getHost(), channel.getPort(),
                channel.getUser(), channel.getPassword())));
            this.registrations.put(channel, flowContext.registration(flow).register());
            this.sessions.put(sessionId, context);
            LOG.info("sftp session {} for {} started", sessionId, context);
        }
    }

    private DefaultSftpSessionFactory sftpSessionFactory(String host, int port, String user, String password) {
        LOG.debug("sftpSessionFactory");
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(host);
        factory.setPort(port);
        factory.setUser(user);
        factory.setPassword(password);
        factory.setAllowUnknownKeys(true);
        return factory;
    }

    private CachingSessionFactory<LsEntry> cashedSftpSessionFactory(String host, int port, String user, String password) {
        LOG.debug("cashedSftpSessionFactory");
        CachingSessionFactory<LsEntry> cashedSessionFactory =
            new CachingSessionFactory<LsEntry>(
                sftpSessionFactory(host, port, user, password));
        return cashedSessionFactory;
    }

    @Override
    public void sftpGetFile(String sessionId, String remoteDir, String remoteFilename) {
        TransferContext context = sessions.get(sessionId);
        if (context == null)
            throw new IllegalStateException("Session not established, sessionId " + sessionId);

        IntegrationFlowRegistration register = registrations.get(context.getChannel());
        if (register != null) {
            try {
                LOG.debug("sftpGetFile get file {}", remoteDir + "/" + remoteFilename);
                register.getMessagingTemplate().send(
                    MessageBuilder.withPayload(msg)
                        .setHeader(...).build());       
            }
            catch (Exception e) {
                appContext.getBean(context, DefaultSftpSessionFactory.class)
                    .close();   
            }
        }
    }

    @Override
    public void disconnect(String sessionId, TransferContext context) {
        IntegrationFlowRegistration registration = this.registrations.remove(context.getChannel());
        if (registration != null) {
            registration.destroy();
        }
        LOG.info("sftp session for {} finished", context);
    }
}

我不知道如何启动 SFTP 命令。当使用 OutboundGateway 并且必须立即指定 SFTP 命令(如 GET)时,我也没有得到,然后整个 SFTP 处理将在一种方法中,指定出站网关工厂并使用 get() 获取实例并可能调用以任何方式发送消息 .get()。

显然我需要帮助。

【问题讨论】:

    标签: java spring-integration sftp spring-integration-dsl


    【解决方案1】:

    首先,如果您已经使用 Spring Integration 通道适配器,则可能没有理由直接使用像 RemoteFileTemplate 这样的低级 API。

    其次存在技术差异:SftpInboundFileSynchronizingMessageSource 将生成一个本地文件 - 远程文件的完整副本。因此,当我们在下游处理您的 SftpRemoteFileTemplate 逻辑时,它不会很好地工作,因为我们只会带来一个本地文件 (java.io.File),而不是用于远程文件表示的实体。

    即使sftpGetPayload() 中的逻辑看起来不像需要这样一个单独的方法那样复杂和自定义,最好将SftpRemoteFileTemplate 作为单例并在工作时在不同组件之间共享它针对同一个 SFTP 服务器。它只是简单的无状态 Spring 模板模式实现。

    如果您仍然坚持使用上述集成流程中的方法,您应该考虑为 @ServiceActivator(inputChannel = "sftpChannel") 调用 POJO 方法。在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/configuration.html#annotations

    您还可以找到 SFTP 出站网关作为您的用例的有用组件。它有一些常见的场景实现:https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#sftp-outbound-gateway

    【讨论】:

    • 非常感谢 Artem!你是对的,我使用 sftpGetPayLoad 的基于模板的代码完全按照你的说法使用:一个在不同组件之间共享的单例,所有组件在同一个 sftp 服务器上都有自己的通道(具有不同的用户/密码和文件和目录)。我对此非常满意。但不包括入站文件的消费者不知道何时收到文件的用例。
    • 对于该用例,需要一个轮询机制。上面的 Java 配置 Dsl 代码确实是 Spring 文档中的 SFTP 入站通道适配器,但它是静态的,仅用于一个名为“sftpChannel”的通道。我再次需要一些更灵活的东西,比如可供几个组件使用的可配置单例​​。但是您向我指出了适配器/网关:也许我正在寻找基于 IntegrationFlow 的 SFTP 入站通道适配器?
    • 好吧,SftpRemoteFileTemplate 不是您代码中的单例。每次调用 sftpGetPayload() 方法时都会创建它。那是我的观点。也许您可以解释您的业务目标,我会想我们可以为您使用 Spring Integration 做些什么。尽管如此,还是有办法将IntegrationFlow 作为模板。查看文档中的动态流程:docs.spring.io/spring-integration/docs/current/reference/html/…
    • 感谢您的解释。我可能不明白。更好的软件设计可能是 SftpRemoteFileTemplate 是一个单例。为了服务多个“通道”,我将在每次使用单例时设置主机、端口、用户、密码等,而不是每次都创建一个实例?
    • 如何解释我的业务目标?在这里发表评论?
    猜你喜欢
    • 2015-10-16
    • 1970-01-01
    • 2021-12-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多