【问题标题】:How to handle stateful item reader in SpringBatch如何在 Spring Batch 中处理有状态的 itemreader
【发布时间】:2016-05-03 15:07:48
【问题描述】:

我们的 SpringBatch 作业只有一个包含 ItemReader、ItemProcessor 和 ItemWriter 的步骤。我们正在使用不同的参数同时运行相同的作业。 ItemReader 是有状态的,因为它包含从中读取的输入流。

因此,我们不希望每个 JobInstance(Job + 参数)调用都使用相同的 ItemReader 实例。

我不太确定这种情况下哪个是最好的“范围界定”。

1) Step 是否应该使用@JobScope 进行注释,并且 ItemReader 是原型?

2) Step 是否应该用@StepScope 注释,并且 ItemReader 是原型?

3) Step 和 ItemReader 都应该注解为 Prototype 吗?

最终结果应该是为每个具有不同标识参数的新作业执行创建一个新的 ItemReader(即,为每个新的 JobInstance)。

谢谢。 -AP_

【问题讨论】:

    标签: spring-batch


    【解决方案1】:

    从类实例化的角度来看(从最少实例到最多实例):

    • 单例(每个 JVM)
    • JobScope(每个作业)
    • StepScope(每步)
    • 原型(每个参考)

    如果您在单个 JVM 中运行多个作业(假设您不在分区 Step 中,JobScope 就足够了。如果您有分区 step,您将需要 StepScope。原型在所有情况下都会过大。

    但是,如果这些作业在不同的 JVM 中启动(而不是分区步骤),那么一个简单的 Singleton bean 就可以了。

    【讨论】:

      【解决方案2】:

      不需要每个组件(Step、ItemReader、ItemProcessor、ItemWriter)都必须是弹簧组件。例如,使用 SpringBatch-JavaApi,只有您的 Job 需要是 SpringBean,而不是您的 Steps、Readers 和 Writers:

          @Autowired
      private JobBuilderFactory jobs;
      
      @Autowired
      private StepBuilderFactory steps;
      
      @Bean
      public Job job() throws Exception {
          return this.jobs.get(JOB_NAME) // create jobbuilder
                  .start(step1()) // add step 1
                  .next(step2()) // add step 2
                  .build(); // create job
      }
      
      @Bean
      public Job job() throws Exception {
          return this.jobs.get(JOB_NAME) // create jobbuilder
                  .start(step1(JOB_NAME)) // add step 1
                  .next(step2(JOB_NAME)) // add step 2
                  .build(); // create job
      }
      
      private Step step1(String jobName) throws Exception {
      
          return steps.get(jobName + "_Step_1").chunk(10) //
                  .faultTolerant() //
                  .reader(() -> null) // you could lambdas
                  .writer(items -> {
                  }) //
                  .build();
      }
      
      private Step step2(String jobName) throws Exception {
          return steps.get(jobName + "_Step_2").chunk(10) //
                  .faultTolerant() //
                  .reader(createDbItemReader(ds, sqlString, rowmapper)) //
                  .writer(createFileWriter(resource, aggregator)) //
                  .build();
      }
      

      唯一需要注意的是,在创建 JdbcCurserItemReader、FlatFileItemReader/Writer 等实例时,必须调用“afterPropertiesSet”方法:

          private static <T> ItemReader<T> createDbItemReader(DataSource ds, String sql, RowMapper<T> rowMapper) throws Exception {
          JdbcCursorItemReader<T> reader = new JdbcCursorItemReader<>();
      
          reader.setDataSource(ds);
          reader.setSql(sql);
          reader.setRowMapper(rowMapper);
      
          reader.afterPropertiesSet(); // don't forget
          return reader;
      }
      
      private static <T> ItemWriter<T> createFileWriter(Resource target, LineAggregator<T> aggregator) throws Exception {
          FlatFileItemWriter<T> writer = new FlatFileItemWriter();
      
          writer.setEncoding("UTF-8");
          writer.setResource(target);
          writer.setLineAggregator(aggregator);
      
          writer.afterPropertiesSet(); // don't forget
          return writer;
      }
      

      这样,您就无需为 Scopes 烦恼了。每个 Job 都有自己的 Step 实例及其 Readers 和 Writers。

      这种方法的另一个优点是您现在可以完全动态地创建作业。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2012-02-08
        • 1970-01-01
        • 2021-07-31
        • 1970-01-01
        • 1970-01-01
        • 2015-07-06
        • 2019-10-05
        • 1970-01-01
        相关资源
        最近更新 更多