【问题标题】:Spring Batch Itemreader pass same Object twice to ItemprocessorSpring Batch Itemreader 将相同的对象两次传递给 Itemprocessor
【发布时间】:2019-08-02 08:55:08
【问题描述】:

我有带分区的 Spring 批处理。 gridSize 为 10,因此它将产生 10 个线程。一切都是默认的 Bean 单例。 TaskExecutor 最多 15 个,核心池 10 个。

@Bean
@StepScope
public RepositoryItemReader<CustomObject> reader(${executorContext[from]} from, ${executorContext[to], ${executorContext[partitonId]) {
    LOG.info("Partition ID: {} will process row from: {}  to: {}", partitionId, from, to);
    //here has the right output, say 1 to 10, 10 to 20, include from, exclude to
    RepositoryItemReader reader = new RepositoryItemReader();
    reader.setRepository(objectRepo);
    reader.setMethod("findByProcessedFromAndTo");
    //from here I pass in from and to to do the partition
    //omit sorts, pageSize,  params
    reader.setSaveState(false);
    return reader;
}

这是阅读器,此阅读器将在 DB 中返回 4 行。自定义对象 1 到 4。

@Bean
public class processor implements ItemProcessor() {
   @override
   public Object process(customObject) {
       logger.info(customObject.getId());
       //logic
   }
}

@Bean
Step processStep(){
    //chunk 1
    //item reader
    //item processor
    //item writer
    //build
}

Step partitionStep {
    //partion with gridSize 10,
    //processStep
    //taskExecutor
}

Partition  {
    int start = 1;
    int range = totalCount/gridSize + 1;
    for(i to gridSize){
        ExecutionContext context = New ExecutionContext();
       context.put("from",start);
       context.put("to", start*range);
       start += range;
       context.put("partitionId", i); 
       map.put(PARTITION_KEY, context);      
    }
    return map;
}

示例查询:

select * from Table where rownum >=:from and rownum < :to;

设置非常简单。只是一个分区网格大小为 10 的批处理。

当我运行它时,项目阅读器得到 4 条正确的记录。但是当阅读器将数据传递给项目处理器时,我得到了这样的日志,我正在编数。

Thread 2 processing Object Id: 11 //row 10 to 20
Thread 1 processing Object Id: 1 //row  1 to 10
Thread 4 processing Object Id: 31 //row  30 to 40
Thread 6 processing Object Id: 51 //row  50 to 60

由于我实现了分区并在查询中进行了分区。现在所有线程都应该处理分区集并且不应该处理重复记录,但我仍然遇到同样的问题,一些线程会处理重复记录。

Thread 9 processing Object ID:2
Thread 3 processing Object ID:4
and so on

而当整个作业完成后,db中会有未处理的记录。

我是否遗漏了什么,需要帮助。

【问题讨论】:

  • 你是如何对数据进行分区的?你的分区器是什么?

标签: multithreading hibernate spring-data-jpa spring-batch


【解决方案1】:

预期的行为是 ItemReader 应该只将这 4 个数据传递给 4 个线程

这是您对多线程步骤的期望,而不是分区步骤。

对于分区步骤,设置网格大小是不够的。您需要使用Partitioner 预先定义分区并将其设置在主步骤上,以便每个工作步骤处理一个分区。如果不指定分区器,Spring Batch 无法知道如何对项目进行分区,您的阅读器将用于所有(未定义的)分区,多次读取整个数据集。

【讨论】:

  • 您好,打扰了。我更新了我的问题。我让 itemreader stepScope 从分区中注入执行上下文。项目处理器和项目编写器仍然是单例的。整个想法是每个线程都应该处理自己的分区数据集。
猜你喜欢
  • 2018-06-18
  • 1970-01-01
  • 2021-01-04
  • 2013-09-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多