【问题标题】:Spring Batch Partitioning At Runtime运行时的 Spring Batch 分区
【发布时间】:2020-06-21 16:43:48
【问题描述】:

我想使用 Spring Batch 读取一个大文件。我想拆分为多个文件并使用分区在不同的线程中处理每个文件。我正在使用以下代码:

@Bean
@StepScope
public MultiResourcePartitioner partitioner() {
    MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
    partitioner.setKeyName("file");
    partitioner.setResources(splitFiles());
    return partitioner;
}

private Resource[] splitFiles() {
    // Read the large File available in the specified folder
    // split the file to smaller files and return them as resource list
}

@Bean
public TaskExecutorPartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();
    partitionHandler.setStep(step1());
    partitionHandler.setTaskExecutor(new SimpleAsyncTaskExecutor());
    return partitionHandler;
}

@Bean
public Step partitionedMaster() {
    return this.stepBuilderFactory.get("step1")
                .partitioner(step1().getName(), partitioner(null))
                .partitionHandler(partitionHandler())
                .build();
}

@Bean
public Job partitionedJob() {
    return this.jobBuilderFactory.get("partitionedJob")
                                .start(partitionedMaster())
                                .build();
}

@Bean
@StepScope
public FlatFileItemReader<Transaction> fileTransactionReader(@Value("#{stepExecutionContext['file']}") Resource resource) {
    return new FlatFileItemReaderBuilder<Transaction>()
                .name("flatFileTransactionReader")
                .resource(resource)
                .fieldSetMapper(fsm)
                .build();
}

我的问题是分区程序正在对仅在应用程序启动时的文件夹中可用的文件进行分区。应用程序启动并运行后,如果同一文件夹中有新文件可用,则作业无法读取它们/对其进行分区。 我使用了@StepScope,但我仍然遇到问题。

如何在运行时动态读取和分区文件?

在第一个答案后编辑它:

您好,感谢您的意见。 我可以修改如下代码以将文件作为参数发送并调用作业,但控件仍然没有进入分区方法,因此无法利用分区。 对此有何意见?

public JobParameters getJobParameters() {
    Resource[] resources = //getFileToProcessResource
    return new JobParametersBuilder()
            .addLong(TIME, System.currentTimeMillis())
            .addString("inputFiles", resources)
            .toJobParameters();
}

JobParameters jobParameters = getJobParameters();
jobLauncher.run(partitionedJob(), jobParameters);

@Bean
@StepScope
public MultiResourcePartitioner partitioner(@Value("#{jobParameters['inputFiles']}") Resource[] resources) {
    MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
    partitioner.setKeyName("file");
    partitioner.setResources(resources);
    return partitioner;
}

【问题讨论】:

    标签: spring spring-batch


    【解决方案1】:

    应用程序启动并运行后,如果同一文件夹中有新文件可用,则作业无法读取它们/对其进行分区

    批处理是关于固定数据集的。在您的情况下,您开始了一项工作,但其输入数据同时发生了变化,因此这不会像您预期的那样工作。需要一个固定的数据集才能重新启动,以便在发生故障时处理相同的数据集。

    由于作业的输入是文件,因此您可以将该文件用作作业参数并配置监视服务(或类似机制)以为文件夹中的每个新文件启动一个新作业实例。

    编辑:添加示例以使分区程序了解作业参数

    @Bean
    @StepScope
    public MultiResourcePartitioner partitioner(@Value("#{jobParameters['fileName']}") String fileName) {
        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
        partitioner.setKeyName("file");
        partitioner.setResources(splitFiles(fileName));
        return partitioner;
    }
    
    private Resource[] splitFiles(String fileName) {
        // Read the large File available in the specified folder
        // split the file to smaller files and return them as resource list
        return null;
    }
    

    【讨论】:

    • 您好,感谢您的意见。我可以如下修改代码以将文件作为参数发送并调用作业,但控件仍然没有进入分区方法,因此无法利用分区。对此有何意见?
    • 基本上我想做的就是每天从ftp读取一个大的csv文件并对数据进行一些处理并存储在数据库中
    • but still the control is not going inside partitioner method, hence could not leverage partitioning:您可以让分区程序读取作为作业参数传递的输入文件并使用相同的技术。我用一个例子编辑了答案。如果有帮助请接受:stackoverflow.com/help/someone-answers。谢谢。
    • 我也做过同样的事情。但是当新的作业执行被触发时,我没有看到方法 partitionedJob() 或 partitioner() 被调用。我做错了吗?
    • 我发现了我的问题。代码是对的。只是它需要成为配置的一部分。
    猜你喜欢
    • 1970-01-01
    • 2014-10-15
    • 2018-08-07
    • 2019-04-15
    • 1970-01-01
    • 2014-02-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多