【问题标题】:Spring Batch : Aggregating records and write countSpring Batch:聚合记录和写入计数
【发布时间】:2015-11-20 11:24:36
【问题描述】:

我们在平面文件中有一些数据。例如

EmpCode,Salary,EmpName,...  
100,1000,...,...
200,2000,...,...
200,2000,...,...
100,1000,...,...
300,3000,...,...
400,4000,...,...

我们想根据 EmpCode 汇总工资并写入数据库

Emp_Code    Emp_Salary   Updated_Time   Updated_User 
100         2000         ...            ...
200         4000         ...            ...
300         3000         ...            ...
400         4000         ...            ...

我已经按照 Spring Batch 编写了如下类

ItemReader - to read the employee data into a Employee object                

EmployeeItemProcessor 示例:

public class EmployeeProcessor implements ItemProcessor<Employee, Employee> {

    @Override
    public Employee process(Employee employee) throws Exception {
        employee.setUpdatedTime(new Date());
        employee.setUpdatedUser("someuser");
        return employee;
    }

EmployeeItemWriter:

@Repository
public class EmployeeItemWriter implements ItemWriter<Employee> { 
 @Autowired
 private SessionFactory sf;

 @Override  
 public void write(List<? extends Employee> employeeList) throws Exception {  
  List<Employee> aggEmployeeList = aggregateEmpData(employeeList);
  //write to db using session factory
 }  

 private List<Employee> aggregateEmpData(List<? extends Employee> employeeList){
     Map<String, Employee> map = new HashMap<String, Employee>(); 
    for(Employee e: employeeList){
        String empCode =  e.getEmpCode();
        if(map.containsKey(empCode)){
            //get employee salary and add up
         }else{
          map.put(empCode,Employee);
         }
     }    
     return new ArrayList<Employee>(map.values());         
 }
}

XML 配置

...
<batch:job id="employeeJob">
    <batch:step id="step1">
    <batch:tasklet>
        <batch:chunk reader="employeeItemReader" 
            writer="employeeItemWriter" processor="employeeItemProcessor"
            commit-interval="100">
        </batch:chunk>
    </batch:tasklet>
    </batch:step>
  </batch:job>
...

它正在工作并服务于我的目的。不过,我有几个问题。

1)当我查看日志时,显示如下(commit-interval=100):

status=COMPLETED, exitStatus=COMPLETED, readCount=2652, filterCount=0, writeCount=2652 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=27, rollbackCount=0

但聚合后,只有 2515 条记录写入数据库。 write count 是 2652,是不是因为到达 ItemWriter 的 item 数还是 2652?如何纠正?

2) 我们对列表进行了两次迭代。一次在 ItemProcessor 中,然后在 ItemWriter 中进行聚合。如果记录数较高,则可能是性能问题。有没有更好的方法来实现这一点?

【问题讨论】:

  • 请把ItemReader发给你
  • 嗨,ItemReader 就像任何其他 ItemReader 类一样。绝对没有多余的逻辑。

标签: spring spring-batch


【解决方案1】:

如果输入文件的每一行都是一个员工对象,那么您的 ReadCount 将是输入文件中的行数。 WriteCount 将是传递给项目编写器的所有列表大小的总和。因此,也许您的 aggregateEmpData 函数将一些记录删除或聚合为一个,因此,您的数据库计数与 WriteCount 不同。 如果您想确保 WriteCount 正是数据库中的记录数,您应该在处理器中进行聚合。

【讨论】:

  • 是的,完全正确。聚合函数聚合记录。因此,计数较少。我提到了堆栈溢出中的一个问题,并尝试在 ItemProcessor 中进行聚合,但地图会随着每个项目调用而初始化。你能告诉我在 itemprocessor 中实现它的方法吗?
【解决方案2】:

为什么要聚合在ItemWriter?我会在ItemProcessor 中做到这一点。这将允许写入计数准确,并将该组件与实际写入行为分开。如果您对您的配置提供一些见解,我们可以详细说明。

【讨论】:

  • 嗨迈克尔,当我设法在处理器中进行聚合时,我尝试使用 HibernateItemWriter。这是工作。但是,根据我们的流程,我们会在加载之前删除所有以前的数据。因此,此处不需要 saveOrUpdate。我写了自己的作家只是为了使用 Hibernate 来“保存”数据。我想知道,我们是否可以将 HibernateItemWriter 配置为只保存?
  • 能否将该问题作为单独的问题发布,以便其他人找到答案?
【解决方案3】:

我设法写了它。我是这样做的。

public class EmployeeProcessor implements ItemProcessor<Employee, Employee> {
    Map<String, Employee> map;
    @Override
    public Employee process(Employee employee) throws Exception {
        employee.setUpdatedTime(new Date());
        employee.setUpdatedUser("someuser");
        String empCode =  employee.getEmpCode();
        if(map.containsKey(empCode)){
            //get employee salary and add up
            return null; 
         }
         map.put(empCode,employee);
         return employee;
    }

    @BeforeStep
    public void beforeStep(StepExecution stepExecution) {
         map = new HashMap<String, Employee>(); 
    }

写入计数现在显示正确。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多