【问题标题】:SpringBatch Sharing Large Amounts of Data Between StepsSpring Batch在步骤之间共享大量数据
【发布时间】:2020-08-24 16:36:39
【问题描述】:

我需要在作业步骤之间共享相对大量的数据以实现 Spring 批处理。我知道StepExecutionContextJobExecutionContext 作为这种机制。但是,我读到了,因为它们的大小必须受到限制(少于 2500 个字符)。这对我的需要来说太小了。在我的新手一步Spring Batch 实现中,我的单步工作如下:

@Configuration
@EnableBatchProcessing
public class BatchConfig {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;


    private static final String GET_DATA =
    "    SELECT " +
    "stuffA, " +
    "stuffB, " +
    "FROM STUFF_TABLE " +
    "ORDER BY stuffA ASC";

    @Bean
    public ItemReader<StuffDto> databaseCursorItemReader(DataSource dataSource) {
        return new JdbcCursorItemReaderBuilder<StuffDto>()
                .name("cursorItemReader")
            .dataSource(dataSource)
            .sql(GET_DATA)
            .rowMapper(new BeanPropertyRowMapper<>(StuffDto.class))
                .build();
    }

    @Bean
    ItemProcessor<StuffDto, StuffDto> databaseXmlItemProcessor() {
        return new QueryLoggingProcessor();
    }

    @Bean
    public ItemWriter<StuffDto> databaseCursorItemWriter() {
        return new LoggingItemWriter();
    }

    @Bean
    public Step databaseCursorStep(@Qualifier("databaseCursorItemReader") ItemReader<StuffDto> reader,
    @Qualifier("databaseCursorItemWriter") ItemWriter<StuffDto> writer,
    StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("databaseCursorStep")
            .<StuffDto, StuffDto>chunk(1)
        .reader(reader)
            .writer(writer)
            .build();
    }

    @Bean
    public Job databaseCursorJob(@Qualifier("databaseCursorStep") Step exampleJobStep,
    JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory.get("databaseCursorJob")
            .incrementer(new RunIdIncrementer())
            .flow(exampleJobStep)
            .end()
            .build();
    }
}

从某种意义上说,这很好用,我可以成功地从数据库中读取数据并在写入器步骤中写入这样的 loggingitemwriter:

public class LoggingItemWriter implements ItemWriter<StuffDto> {

    private static final Logger LOGGER = LoggerFactory.getLogger(LoggingItemWriter.class);

    @Override
    public void write(List<? extends StuffDto> list) throws Exception {
        LOGGER.info("Writing stuff: {}", list);
    }
}

但是,我需要能够将 StuffDto(或等效项)和它的数据提供给第二步,该步骤将对它执行一些处理,而不仅仅是记录它。

如果您认为步骤和工作环境过于有限,我将不胜感激。谢谢。

【问题讨论】:

  • 您将它们存储在某个临时存储库中,例如数据库或文件系统?
  • 我正在寻找一个希望在 springbatch 的上下文中传达的解决方案
  • 只需创建一个复合编写器,它会记录日志然后进行分析。基本上,您将登录硬塞进了一个单独的步骤。哎呀,您甚至可以为此使用ItemListenerStepListener 并记录书面项目(这可能更适合该任务)。
  • 您定义了一个项目处理器,但您没有在步骤上设置它。这是故意的吗? performing some processing against it rather than just logging it:您可以在项目处理器中实现该处理。您确定需要进行第二步处理吗?
  • @M.Deinum,谢谢。是否可以解释我如何使用 itemlistener 或 steplistener 共享第二步的数据?谢谢。

标签: java spring spring-batch


【解决方案1】:

如果您不想在数据库或文件系统中写入数据,实现相同的一种方法如下:

  1. 在您的 config 类中创建您自己的作业上下文 bean,并使用 @JobScope 对其进行注释。
  2. 为您的阅读器、处理器和编写器类实现org.springframework.batch.core.step.tasklet 接口。如果您想更好地控制步骤,您也可以使用它实现org.springframework.batch.core.StepExecutionListener
  3. 使用@Autowire 获取您自己的context 对象,并使用它的setter-getter 方法来存储和检索数据。

示例代码:

Config.java

@Autowired
private Processor processor;

@Autowired
private Reader reader;

@Autowired
private Writer writer;

@Autowired
private JobBuilderFactory jobBuilderFactory;
     
@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
@JobScope
public JobContext getJobContexts() {
    JobContext newJobContext = new JobContext();
    return newJobContext;
}

@Bean
public Step reader() {
      return stepBuilderFactory.get("reader")
        .tasklet(reader)
        .build();
     }

@Bean
public Step processor() {
      return stepBuilderFactory.get("processor")
        .tasklet(processor)
        .build();
     }

@Bean
public Step writer() {
      return stepBuilderFactory.get("writer")
        .tasklet(writer)
        .build();
     }


public Job testJob() {
      
      return jobBuilderFactory.get("testJob")
        .start(reader())
        .next(processor())
        .next(writer())
        .build();
     }

//Below will start the job
@Scheduled(fixedRate = 1000)
    public void starJob(){
        
        Map<String, JobParameter> confMap = new HashMap<>();
        confMap.put("time", new JobParameter(new Date()));
        JobParameters jobParameters = new JobParameters(confMap);
        monitorJobLauncher.run(testJob(), jobParameters);
        
}

JobContext.java

private List<StuffDto> dataToProcess = new ArrayList<>();
private List<StuffDto> dataToWrite = new ArrayList<>();

//getter

SampleReader.java

@Component
public class SampleReader  implements Tasklet,StepExecutionListener{
        @Autowired
        private JobContext context;
       
      @Override
      public void beforeStep(StepExecution stepExecution) {
    //logic that you need to perform before the execution of this step.
      }
    
    @Override
      public void afterStep(StepExecution stepExecution) {
    //logic that you need to perform after the execution of this step.
      }
    
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
     // Whatever code is here will get executed for reader.
     //  Fetch StuffDto object from database and add it to jobContext 
   //dataToProcess list.
    return RepeatStatus.FINISHED;
    }
}

SampleProcessor.java

   @Component
   public class SampleProcessor  implements Tasklet{
    
       @Autowired
       private JobContext context;
    
        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
        
         // Whatever code is here will get executed for processor.
        // context.getDataToProcessList();
       // apply business logic and set the data to write.

return RepeatStatus.FINISHED;
    }

writer 类的方法相同。

注意:这里请注意,这里需要自己编写数据库相关的样板代码。但是通过这种方式,您可以更好地控制您的逻辑,而无需担心上下文大小限制。所有数据都将在内存中,因此一旦操作完成,这些数据将被垃圾收集。我希望你能明白我愿意传达的意思。

要了解更多关于 TaskletChunk 的信息,请阅读 this

【讨论】:

  • 感谢您的详细示例。我现在正在尝试实施它。作者是否需要任何特定的实现(例如,您有:“公共类 SampleReader 实现 Tasklet,StepExecutionListener”用于阅读器,“公共类 SampleProcessor 实现 Tasklet”用于处理器。谢谢
  • 是的,Tasklet 的实现是强制性的。您可能会也可能不会实施StepExectuionListener,这取决于您。
  • 谢谢。索拉布。正如你上面建议的那样,我能够使用 JobScope 的自定义工作上下文来完成我需要做的事情。我的设置与您的示例不同,但关键是使用 JobScope 自定义作业上下文。再次感谢。
  • @Saurabh 为什么必须强制执行Tasklet?不能在 Chunk 中使用 JobScope 吗?我面临着类似的问题,我需要根据第一步的作者写入的数据来处理数据,以便对第二步的阅读器进行额外的读取。您能否帮我解答我的问题:stackoverflow.com/questions/68757616/…。谢谢
猜你喜欢
  • 2023-03-09
  • 1970-01-01
  • 2019-06-17
  • 1970-01-01
  • 2019-05-24
  • 1970-01-01
  • 2016-09-19
  • 1970-01-01
  • 2018-09-24
相关资源
最近更新 更多