实现此目的的一种方法是通过Partitioning。如果您希望跟踪哪些customersIds 已完成,这种方法似乎更好,因为每个客户 ID 都会有一个从属步骤。
步骤
1.首先通过实现org.springframework.batch.core.partition.support.Partitioner 接口创建您的分区器类并为每个客户ID 填充Map<String, ExecutionContext>。
由于您是按客户 ID 进行分区,因此方法参数 gridSize 将不会用于您的案例。
代码如下所示,其中allCustomers 是您从数据库中准备的列表。
类 - CustomerPartitioner
Map<String, ExecutionContext> result = new HashMap<>();
int partitionNumber = 0;
for (String customer: allCustomers) {
ExecutionContext value = new ExecutionContext();
value.putString("customerId", customer);
result.put("Customer Id [" + customer+ "] : THREAD "
+ partitionNumber, value);
partitionNumber++;
}
2.根据主步骤和从步骤修改您的步骤定义。请参阅在线教程。
示例代码将与此类似。
@Bean
public Step customerPartitionerStep() throws Exception {
return step.get("customerPartitionerStep")
.partitioner(shipmentFactsStep())
.partitioner("shipmentFactsStep", customerPartitioner())
.gridSize(partitionerGridSize).taskExecutor(taskExecutor())
.build();
}
@Bean
public Step shipmentFactsStep() {
return stepBuilderFactory.get("shipmentFactsStep")
.<Shipmentfacts, Shipmentfacts>chunk(10000)
.reader(shipmentfactsItemReader())
.processor(shipmentFactProcessor())
.writer(shipmentFactsWriter())
.build();
}
@Bean
public Partitioner customerPartitioner() {
return new CustomerPartitioner();
}
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor simpleTaskExecutor = new SimpleAsyncTaskExecutor();
simpleTaskExecutor.setConcurrencyLimit(concurrencyLimit);
return simpleTaskExecutor;
}
您可以将partitionerGridSize 设置为任何值,因为它没有在您的分区器实现中使用。您可以稍后使用它来根据总记录数而不是仅客户 ID 进行分区。
3.在上面第 2 步的代码中,设置 concurrencyLimit=1 非常重要。这样一次只能运行一个客户,并且它将为您在第 1 步在地图中输入的所有客户运行。通过设置此值,您可以并行运行任意数量的客户。
4.customerId 来自第 1 步的分区器,可以通过以下方式在读取器、处理器等中访问
@Bean
@StepScope
public ItemReader<ReadBean> shipmentfactsItemReader(
@Value("#{stepExecutionContext[customerId]}" String customerId){
..
}
注意注释,@StepScope ..这是此值绑定所必需的。此外,在您的读者定义中,您需要像这样传递null - .reader(shipmentfactsItemReader(null))
在您的 Spring Batch 元数据中,您将拥有与客户数量一样多的步骤加上一个主步骤。当所有从属步骤完成后,主步骤将结束。
这里的优势是,如果需要,您可以并行处理多个客户,并且客户的每个从属步骤都将在其自己的单独线程中运行。
希望对你有帮助!!