【发布时间】:2019-08-02 08:55:08
【问题描述】:
我有带分区的 Spring 批处理。 gridSize 为 10,因此它将产生 10 个线程。一切都是默认的 Bean 单例。 TaskExecutor 最多 15 个,核心池 10 个。
@Bean
@StepScope
public RepositoryItemReader<CustomObject> reader(${executorContext[from]} from, ${executorContext[to], ${executorContext[partitonId]) {
LOG.info("Partition ID: {} will process row from: {} to: {}", partitionId, from, to);
//here has the right output, say 1 to 10, 10 to 20, include from, exclude to
RepositoryItemReader reader = new RepositoryItemReader();
reader.setRepository(objectRepo);
reader.setMethod("findByProcessedFromAndTo");
//from here I pass in from and to to do the partition
//omit sorts, pageSize, params
reader.setSaveState(false);
return reader;
}
这是阅读器,此阅读器将在 DB 中返回 4 行。自定义对象 1 到 4。
@Bean
public class processor implements ItemProcessor() {
@override
public Object process(customObject) {
logger.info(customObject.getId());
//logic
}
}
@Bean
Step processStep(){
//chunk 1
//item reader
//item processor
//item writer
//build
}
Step partitionStep {
//partion with gridSize 10,
//processStep
//taskExecutor
}
Partition {
int start = 1;
int range = totalCount/gridSize + 1;
for(i to gridSize){
ExecutionContext context = New ExecutionContext();
context.put("from",start);
context.put("to", start*range);
start += range;
context.put("partitionId", i);
map.put(PARTITION_KEY, context);
}
return map;
}
示例查询:
select * from Table where rownum >=:from and rownum < :to;
设置非常简单。只是一个分区网格大小为 10 的批处理。
当我运行它时,项目阅读器得到 4 条正确的记录。但是当阅读器将数据传递给项目处理器时,我得到了这样的日志,我正在编数。
Thread 2 processing Object Id: 11 //row 10 to 20
Thread 1 processing Object Id: 1 //row 1 to 10
Thread 4 processing Object Id: 31 //row 30 to 40
Thread 6 processing Object Id: 51 //row 50 to 60
由于我实现了分区并在查询中进行了分区。现在所有线程都应该处理分区集并且不应该处理重复记录,但我仍然遇到同样的问题,一些线程会处理重复记录。
Thread 9 processing Object ID:2
Thread 3 processing Object ID:4
and so on
而当整个作业完成后,db中会有未处理的记录。
我是否遗漏了什么,需要帮助。
【问题讨论】:
-
你是如何对数据进行分区的?你的分区器是什么?
标签: multithreading hibernate spring-data-jpa spring-batch