【问题标题】:How to run Spring batch job with different parameters?如何使用不同的参数运行 Spring 批处理作业?
【发布时间】:2017-02-20 13:58:20
【问题描述】:

我正在使用带有自定义读取器和写入器的 Spring Batch。

我有一个带有 customerId控制表

我需要多次运行同一个步骤,为我的控制表中的每个客户运行一次。

customerId 应该能够作为参数传递,因为我在阅读器和编写器中都需要它。

如何才能最好地做到这一点?

  @Bean
public Step shipmentFactsStep() {
    return stepBuilderFactory.get("shipmentFactsStep")
            .<Shipmentfacts, Shipmentfacts>chunk(10000)
            .reader(shipmentfactsItemReader())
            .processor(shipmentFactProcessor())
            .writer(shipmentFactsWriter())
            .build();
}

【问题讨论】:

    标签: spring spring-batch


    【解决方案1】:

    实现此目的的一种方法是通过Partitioning。如果您希望跟踪哪些customersIds 已完成,这种方法似乎更好,因为每个客户 ID 都会有一个从属步骤。

    步骤

    1.首先通过实现org.springframework.batch.core.partition.support.Partitioner 接口创建您的分区器类并为每个客户ID 填充Map&lt;String, ExecutionContext&gt;

    由于您是按客户 ID 进行分区,因此方法参数 gridSize 将不会用于您的案例。

    代码如下所示,其中allCustomers 是您从数据库中准备的列表。

    类 - CustomerPartitioner

    Map<String, ExecutionContext> result = new HashMap<>();
    int partitionNumber = 0;
     for (String customer: allCustomers) {
            ExecutionContext value = new ExecutionContext();
            value.putString("customerId", customer);
            result.put("Customer Id [" + customer+ "] : THREAD "
                + partitionNumber, value);
            partitionNumber++;
            }
    

    2.根据主步骤和从步骤修改您的步骤定义。请参阅在线教程。

    示例代码将与此类似。

    @Bean
        public Step customerPartitionerStep() throws Exception {
        return step.get("customerPartitionerStep")
            .partitioner(shipmentFactsStep())
            .partitioner("shipmentFactsStep", customerPartitioner())
            .gridSize(partitionerGridSize).taskExecutor(taskExecutor())
            .build();
        }
    
        @Bean
    public Step shipmentFactsStep() {
        return stepBuilderFactory.get("shipmentFactsStep")
                .<Shipmentfacts, Shipmentfacts>chunk(10000)
                .reader(shipmentfactsItemReader())
                .processor(shipmentFactProcessor())
                .writer(shipmentFactsWriter())
                .build();
    }
    
    @Bean
        public Partitioner customerPartitioner() {
        return new CustomerPartitioner();
        }
    
    @Bean
        public TaskExecutor taskExecutor() {
        SimpleAsyncTaskExecutor simpleTaskExecutor = new SimpleAsyncTaskExecutor();
        simpleTaskExecutor.setConcurrencyLimit(concurrencyLimit);
        return simpleTaskExecutor;
        }
    

    您可以将partitionerGridSize 设置为任何值,因为它没有在您的分区器实现中使用。您可以稍后使用它来根据总记录数而不是仅客户 ID 进行分区。

    3.在上面第 2 步的代码中,设置 concurrencyLimit=1 非常重要。这样一次只能运行一个客户,并且它将为您在第 1 步在地图中输入的所有客户运行。通过设置此值,您可以并行运行任意数量的客户。

    4.customerId 来自第 1 步的分区器,可以通过以下方式在读取器、处理器等中访问

    @Bean
    @StepScope
    public ItemReader<ReadBean> shipmentfactsItemReader(
            @Value("#{stepExecutionContext[customerId]}" String customerId){
    ..
    }
    

    注意注释,@StepScope ..这是此值绑定所必需的。此外,在您的读者定义中,您需要像这样传递null - .reader(shipmentfactsItemReader(null))

    在您的 Spring Batch 元数据中,您将拥有与客户数量一样多的步骤加上一个主步骤。当所有从属步骤完成后,主步骤将结束。

    这里的优势是,如果需要,您可以并行处理多个客户,并且客户的每个从属步骤都将在其自己的单独线程中运行。

    希望对你有帮助!!

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-11-14
      • 2014-06-20
      • 2014-09-28
      • 2021-01-15
      • 2015-07-30
      • 1970-01-01
      • 2017-03-30
      • 1970-01-01
      相关资源
      最近更新 更多