【问题标题】:How to add tasklet to run after each partition step completion in Spring Batch如何在 Spring Batch 中的每个分区步骤完成后添加 tasklet 以运行
【发布时间】:2021-02-25 07:42:57
【问题描述】:

我是 Spring Batch 的新手,我正在实施一个 Spring Batch 作业,它必须从数据库中提取大量数据集并写入文件。以下是对我来说按预期工作的示例作业配置。

@Bean
public Job customDBReaderFileWriterJob() throws Exception {
    return jobBuilderFactory.get(MY_JOB)
            .incrementer(new RunIdIncrementer())
            .flow(partitionGenerationStep())
            .next(cleanupStep())
            .end()
            .build();
}

@Bean
public Step partitionGenerationStep() throws Exception {
    return stepBuilderFactory
            .get("partitionGenerationStep")
            .partitioner("Partitioner", partitioner())
            .step(multiOperationStep())
            .gridSize(50)
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
public Step multiOperationStep() throws Exception {
    return stepBuilderFactory
            .get("MultiOperationStep")
            .<Input, Output>chunk(100)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .build();
}

@Bean
@StepScope
public DBPartitioner partitioner() {
    DBPartitioner dbPartitioner = new DBPartitioner();
    dbPartitioner.setColumn(ID);
    dbPartitioner.setDataSource(dataSource);
    dbPartitioner.setTable(TABLE);
    return dbPartitioner;
}

@Bean
@StepScope
public Reader reader() {
    return new Reader();
}

@Bean
@StepScope
public Processor processor() {
    return new Processor();
}

@Bean
@StepScope
public Writer writer() {
    return new Writer();
}    

@Bean
public Step cleanupStep() {
    return stepBuilderFactory.get("cleanupStep")
            .tasklet(cleanupTasklet())
            .build();
}

@Bean
@StepScope
public CleanupTasklet cleanupTasklet() {
    return new CleanupTasklet();
}

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(10);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setThreadNamePrefix("MultiThreaded-");
    return executor;
}    

由于数据集很大,我已将任务执行器的线程池值配置为 10,网格大小为 50。通过此设置,10 个线程一次写入 10 个文件,读取器以块的形式读取文件,因此读取器处理器和写入器流程正在迭代多次(一组 10 个,在移动到下一个分区之前)。

现在,我想添加一个 tasklet,一旦一个线程的所有迭代(读取、处理、写入)完成,即在每个分区完成后,我可以在其中压缩文件。

我确实有一个清理 tasklet 最后要运行,但是有压缩逻辑意味着首先获取从每个分区生成的所有文件,然后执行压缩。请提出建议。

【问题讨论】:

    标签: spring-boot spring-batch batch-processing spring-batch-tasklet


    【解决方案1】:

    您可以将您的工作步骤multiOperationStep 更改为面向块的步骤的FlowStep,然后是一个简单的小任务步骤,您可以在其中进行压缩。也就是说,worker step其实就是两个step合二为一FlowStep

    【讨论】:

      猜你喜欢
      • 2015-05-21
      • 1970-01-01
      • 2019-12-11
      • 1970-01-01
      • 2018-02-26
      • 1970-01-01
      • 2016-12-21
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多