【问题标题】:Launch JobLaunchRequest for each new file in AWS S3 with Spring Batch Integration使用 Spring Batch 集成为 AWS S3 中的每个新文件启动 JobLaunchRequest
【发布时间】:2021-01-12 15:51:52
【问题描述】:

我正在关注文档:Spring Batch IntegrationIntegration AWS 结合使用以汇集 AWS S3。

但在某些情况下,每个文件的批处理执行不起作用。

AWS S3 池工作正常,所以当我放置一个新文件或启动应用程序并且存储桶中有文件时,应用程序会与本地目录同步:

    @Bean
    public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
        return new S3SessionFactory(pAmazonS3);
    }

    @Bean
    public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory pS3SessionFactory) {
        S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(pS3SessionFactory);
        synchronizer.setPreserveTimestamp(true);
        synchronizer.setDeleteRemoteFiles(false);
        synchronizer.setRemoteDirectory("remote-bucket");
        //synchronizer.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "simpleMetadataStore"));
        return synchronizer;
    }

    @Bean
    @InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
    public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
            S3InboundFileSynchronizer pS3InboundFileSynchronizer) {
        S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(pS3InboundFileSynchronizer);
        messageSource.setAutoCreateLocalDirectory(true);
        messageSource.setLocalDirectory(new FileSystemResource("files").getFile());
        //messageSource.setLocalFilter(new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "fsSimpleMetadataStore"));
        return messageSource;
    }

    @Bean("s3filesChannel")
    public PollableChannel s3FilesChannel() {
        return new QueueChannel();
    }

我按照教程创建了FileMessageToJobRequest 我不会把代码放在这里,因为它与文档相同

所以我创建了 bean IntegrationFlow 和 FileMessageToJobRequest:

    @Bean
    public IntegrationFlow integrationFlow(
            S3InboundFileSynchronizingMessageSource pS3InboundFileSynchronizingMessageSource) {
        return IntegrationFlows.from(pS3InboundFileSynchronizingMessageSource, 
                         c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
                .transform(fileMessageToJobRequest())
                .handle(jobLaunchingGateway())
                .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
                .get();
    }

    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest() {
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
        fileMessageToJobRequest.setFileParameterName("input.file.name");
        fileMessageToJobRequest.setJob(delimitedFileJob);
        return fileMessageToJobRequest;
    }

所以在 JobLaunchingGateway 我认为是问题所在:

如果我这样创建:

    @Bean
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

        return jobLaunchingGateway;
    }

案例1(应用启动时Bucket为空):

  • 我在 AWS S3 中上传了一个新文件;
  • 池化工作,文件出现在本地目录中;
  • 但是转换/作业没有被触发;

案例2(应用启动时Bucket已经有一个文件):

  • 作业已启动:
2021-01-12 13:32:34.451  INFO 1955 --- [ask-scheduler-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=arquivoDelimitadoJob]] launched with the following parameters: [{input.file.name=files/FILE1.csv}]
2021-01-12 13:32:34.524  INFO 1955 --- [ask-scheduler-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [delimitedFileJob]
  • 如果我在 S3 中添加第二个文件,则不会像案例 1 那样启动作业。

案例 3(Bucket 有多个文件):

  • 文件在本地目录中正确同步
  • 但作业只针对最后一个文件执行一次。

所以按照docs,我将网关更改为:

    @Bean
    @ServiceActivator(inputChannel = IN_CHANNEL_NAME, poller = @Poller(fixedRate="1000"))
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());

        //JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
        //jobLaunchingGateway.setOutputChannel(replyChannel());
        jobLaunchingGateway.setOutputChannel(s3FilesChannel());
        return jobLaunchingGateway;
    }

使用这个新的网关实现,如果我在 S3 中放置一个新文件,应用程序会做出反应,但没有转换并给出错误:

Caused by: java.lang.IllegalArgumentException: The payload must be of type JobLaunchRequest. Object of class [java.io.File] must be an instance of class org.springframework.batch.integration.launch.JobLaunchRequest

如果存储桶中有两个文件(应用程序启动时)FILE1.csv 和 FILE2.csv,则 FILE1.csv 的作业正确运行,但 FILE2.csv 出现上述错误。

实现这样的事情的正确方法是什么?

为了清楚起见,我想在这个存储桶中接收数千个 csv 文件,使用 Spring Batch 读取和处理,但我还需要尽快从 S3 获取每个新文件。

提前致谢。

【问题讨论】:

    标签: spring-batch spring-integration spring-integration-aws


    【解决方案1】:

    JobLaunchingGateway 确实只期望我们将JobLaunchRequest 作为有效负载。

    既然您在 S3InboundFileSynchronizingMessageSource bean 定义上拥有 @InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30")),那么在没有 FileMessageToJobRequest 之间的变压器的情况下为 JobLaunchingGateway 拥有 @ServiceActivator(inputChannel = IN_CHANNEL_NAME 确实是错误的。

    您的 integrationFlow 对我来说看起来不错,但是您确实需要从 S3InboundFileSynchronizingMessageSource bean 中删除 @InboundChannelAdapter 并完全依赖 c.poller() 配置。

    另一种方法是离开 @InboundChannelAdapter,然后从 IN_CHANNEL_NAME 而不是 MessageSource 开始 IntegrationFlow

    由于您有多个轮询器针对同一个 S3 源,而且两者都基于同一个本地目录,所以看到这么多意外情况也就不足为奇了。

    【讨论】:

    • 你是对的。愚蠢的错误...我决定从 S3InboundFile 中删除 @InboundChannelAdapter 并且它可以工作。
    • 好吧,这并不是那么愚蠢,尤其是当您从某个地方复制/粘贴它时。更重要的是,它甚至可以做到这一点,并在您的应用程序中进行如此复杂的配置。你有一个支持 S3 的并发消费者。但我很高兴它很容易修复!
    • 是的,我没有阅读这两个文档并合并示例。你认为我所做的对我的案子来说是正确的方法吗?我的意思是在可扩展性方面,例如 100k 文件上传到我的存储桶并且我的应用程序可扩展,使用这种批量集成的方法,他们会并发尝试处理同一个文件吗?
    • 如果您要扩展,您需要考虑使用共享元数据存储的持久文件列表过滤器。请参阅 Spring Integration 中的 FTP 文档。
    猜你喜欢
    • 2015-08-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-01-04
    • 2016-02-26
    • 2017-06-12
    • 2017-04-29
    相关资源
    最近更新 更多