【问题标题】:Streaming from remote SFTP directories and sub-directories with Spring Integration使用 Spring Integration 从远程 SFTP 目录和子目录流式传输
【发布时间】:2020-04-02 09:04:21
【问题描述】:

我正在使用 Spring Integration Streaming Inbound Channel Adapter,从远程 SFTP 获取流并解析每一行内容过程。

我用:

IntegrationFlows.from(Sftp.inboundStreamingAdapter(template)
                          .filter(remoteFileFilter)
                          .remoteDirectory("test_dir"),
                        e -> e.id("sftpInboundAdapter")
                              .autoStartup(true)
                              .poller(Pollers.fixedDelay(fetchInt)))
                .handle(Files.splitter(true, true))
....

它现在可以工作了。但是我只能从test_dir目录获取文件,但是我需要从这个目录和子目录递归获取文件并解析每一行。

我注意到 Inbound Channel AdapterSftp.inboundAdapter(sftpSessionFactory).scanner(...) 。它可以扫描子目录。但我没有看到Streaming Inbound Channel Adapter 的任何内容。

那么,如何在Streaming Inbound Channel Adapter 中实现“递归地从目录获取文件”?

谢谢。

【问题讨论】:

    标签: spring-integration spring-integration-dsl spring-integration-sftp


    【解决方案1】:

    你可以使用两个出站网关——第一个做ls -R(递归列表);拆分结果并使用配置了mget -stream 的网关来获取每个文件。

    编辑

    @SpringBootApplication
    public class So60987851Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So60987851Application.class, args);
        }
    
        @Bean
        IntegrationFlow flow(SessionFactory<LsEntry> csf) {
            return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5_000)))
                    .handle(Sftp.outboundGateway(csf, Command.LS, "payload")
                            .options(Option.RECURSIVE, Option.NAME_ONLY)
                            // need a more robust metadata store for persistence, unless the files are removed
                            .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "test")))
                    .split()
                    .log()
                    .enrichHeaders(headers -> headers.headerExpression("fileToRemove", "'foo/' + payload"))
                    .handle(Sftp.outboundGateway(csf, Command.GET, "'foo/' + payload")
                            .options(Option.STREAM))
                    .split(new FileSplitter())
                    .log()
                    // instead of a filter, we can remove the remote file.
                    // but needs some logic to wait until all lines read
    //              .handle(Sftp.outboundGateway(csf, Command.RM, "headers['fileToRemove']"))
    //              .log()
                    .get();
        }
    
        @Bean
        CachingSessionFactory<LsEntry> csf(DefaultSftpSessionFactory sf) {
            return new CachingSessionFactory<>(sf);
        }
    
        @Bean
        DefaultSftpSessionFactory sf() {
            DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
            sf.setHost("10.0.0.8");
            sf.setUser("gpr");
            sf.setPrivateKey(new FileSystemResource(new File("/Users/grussell/.ssh/id_rsa")));
            sf.setAllowUnknownKeys(true);
            return sf;
        }
    
    }
    

    【讨论】:

    • 我是 Spring 集成的新手,有什么例子吗?
    • 我在答案中添加了一个示例。
    【解决方案2】:

    它对我有用,这是我的完整代码

    @Configuration
    public class SftpIFConfig {
    
    @InboundChannelAdapter(value = "sftpMgetInputChannel",
            poller = @Poller(fixedDelay = "5000"))
    public String filesForMGET(){
        return "/upload/done";
    }
    
    
    @Bean
    public IntegrationFlow sftpMGetFlow(SessionFactory<ChannelSftp.LsEntry> csf) {
        return IntegrationFlows.from("sftpMgetInputChannel")
                .handle(Sftp.outboundGateway(csf,
                                AbstractRemoteFileOutboundGateway.Command.LS, "payload")
                        .options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE,  AbstractRemoteFileOutboundGateway.Option.NAME_ONLY)
                        //Persistent file list filter using the server's file timestamp to detect if we've already 'seen' this file.
                        .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "test")))
                .split()
                .log(message -> "file path -> "+message.getPayload())
                .enrichHeaders(headers -> headers.headerExpression("fileToRemove", "'/upload/done/' + payload"))
                .log(message -> "Heder file info -> "+message.getHeaders())
                .handle(Sftp.outboundGateway(csf, AbstractRemoteFileOutboundGateway.Command.GET, "'/upload/done/' + payload")
                        .options(AbstractRemoteFileOutboundGateway.Option.STREAM))
                .split(new FileSplitter())
                .log(message -> "File content -> "+message.getPayload())
                .get();
    }
    
    @Bean
    CachingSessionFactory<ChannelSftp.LsEntry> csf(DefaultSftpSessionFactory sf) {
        return new CachingSessionFactory<>(sf);
    }
    
    @Bean
    DefaultSftpSessionFactory sf() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
        factory.setHost("0.0.0.0");
        factory.setPort(2222);
        factory.setAllowUnknownKeys(true);
        factory.setUser("xxxx");
        factory.setPassword("xxx");
        return factory;
    }
    

    【讨论】:

    • 请添加更多解释说明为什么您的代码可以工作,以便提问者可以在他们的情况下复制它。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-03-23
    • 2010-12-21
    • 1970-01-01
    • 2014-02-13
    • 2016-06-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多