【问题标题】:Null Pointer Exception on passing the value between steps in Spring Batch在 Spring Batch 中的步骤之间传递值时出现空指针异常
【发布时间】:2020-09-22 19:50:41
【问题描述】:

我正在学习 Spring Batch,我正在做一个示例程序,我需要在其中将值从一个步骤传递到另一个步骤。

场景:我有一个人员表,我从中提取人员详细信息,将几列保存到 DTO(在第 1 步的 ItemWriter 中)并从DTO 到 where 子句上的不同表,以从中提取相关值(在步骤 2 的 ItemReader 中)。最后,我将生成一个包含所有这些值的 CSV。

这是我的代码:

@Bean
    public Job job() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
        return jobBuilderFactory.get("readDBJob").incrementer(new RunIdIncrementer()).start(step1()).next(step2())
                .build();
    }

@Bean
public Step step1() {
    return stepBuilderFactory.get("step1").<Person, Person>chunk(500000).reader(itemReader())
            .writer(itemWriter()).listener(promotionListener()).build();
}

@Bean
    public Step step2() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
        return stepBuilderFactory.get("step2").<Person, Result>chunk(100)
                .reader(readingObjectItemReader.cursorReader()).writer(itemWriterForStep2()).build();
    }

第 1 步的 ItemWriter:

        @Bean
public ItemWriter<Person> itemWriter() {
    return new ItemWriter<Person>() {

        private StepExecution stepExecution;
        List<personDTO> responseList = null;

        @Override
        public void write(List<? extends Person> items) throws Exception {
            for (Person item : items) {
                personDTO responseObject = new personDTO();
                BeanUtils.copyProperties(item, responseObject);
                if(responseObject != null && responseObject.getPersonId() != null) {
                    if(stepExecution.getExecutionContext().containsKey("personDtoObject")) {
                        responseList = (List<personDTO>) this.stepExecution.getExecutionContext().get("personDtoObject");
                    }
                    responseList.add(responseObject);
                    this.stepExecution.getExecutionContext().put("personDtoObject", responseList);
                }
            }
        }

        @BeforeStep
        public void saveStepExecution(StepExecution stepExecution) { 
            this.stepExecution = stepExecution;
            this.stepExecution.getExecutionContext().put("personDtoObject", new ArrayList<>());
        }
}

作业执行上下文:

    @Bean
    public Object promotionListener() { 
        ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
        listener.setKeys(new String[] {"personDtoObject"});
        listener.setStrict(true);
        return listener;
    }

这是我尝试在第 2 步 ItemReader 中访问值的方式

公共类 ReadingObjectItemReader 实现 ItemReader {

@Autowired
DataSource dataSource;

private List<personDTO> personDtoList;

String value;


@Override
public personDetails read()
        throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
    return null;
}

@Bean
public JdbcCursorItemReader<personDetails> cursorReader() {

    System.out.println("Values from the step 1 " + personDtoList);
    ....
}

@BeforeStep
public void retrieveSharedData(StepExecution stepExecution) {
    JobExecution jobExecution = stepExecution.getJobExecution();
    ExecutionContext jobContext = jobExecution.getExecutionContext();
    personDtoList=  (List<personDTO>) jobContext.get("personDtoObject");
}

}

当我尝试在第 2 步中访问 personDtoList 的值时,我得到了 null。我在步骤 1 完成之前验证了 StepContext 中的值,直到那里一切看起来都很好,但是当尝试在步骤 2 中访问它们时,我得到了 null。

我查看了大多数在线可用资源,但我无法弄清楚我哪里出错了。任何帮助表示赞赏。

提前感谢您的帮助。

【问题讨论】:

    标签: spring spring-boot spring-batch batch-processing spring-framework-beans


    【解决方案1】:

    在第 1 步的项目编写器中,您正在执行以下操作:

    ExecutionContext stepContext = this.stepExecution.getExecutionContext();
    stepContext.put("personDtoObject", responseList);
    

    这意味着您将覆盖每个块的先前列表。您需要做的是从执行上下文中获取列表并在覆盖键之前在其中添加项目。您还需要在步骤边界(即第一个块和最后一个块)添加一些完整性检查,以确保列表已初始化并且在将其放入执行上下文之前它不是null(尤其是最后一个块)。

    编辑:添加促销监听器工作所需的代码更改

    您还需要将promotionListener()方法的返回类型从Object更改为ExecutionContextPromotionListener

    @Bean
    public ExecutionContextPromotionListener promotionListener() { 
        ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
        listener.setKeys(new String[] {"personDtoObject"});
        listener.setStrict(true);
        return listener;
    }
    

    否则,此 bean 未正确注册为侦听器。这是一个完整的例子:

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.annotation.BeforeStep;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
    import org.springframework.batch.item.ExecutionContext;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.support.ListItemReader;
    import org.springframework.batch.repeat.RepeatStatus;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @EnableBatchProcessing
    public class MyJob {
    
        private JobBuilderFactory jobBuilderFactory;
        private StepBuilderFactory stepBuilderFactory;
    
        public MyJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
            this.jobBuilderFactory = jobBuilderFactory;
            this.stepBuilderFactory = stepBuilderFactory;
        }
    
        @Bean
        public ItemReader<Integer> itemReader() {
            return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
        }
    
        @Bean
        public ItemWriter<Integer> itemWriter() {
            return new ItemWriter<Integer>() {
    
                private StepExecution stepExecution;
    
                @Override
                public void write(List<? extends Integer> items) {
                    List<Integer> itemsList = (List<Integer>) stepExecution.getExecutionContext().get("items");
                    for (Integer item : items) {
                        System.out.println("item = " + item);
                        itemsList.add(item);
                    }
                }
    
                @BeforeStep
                public void saveStepExecution(StepExecution stepExecution) {
                    this.stepExecution = stepExecution;
                    this.stepExecution.getExecutionContext().put("items", new ArrayList<>());
                }
            };
        }
    
        @Bean
        public Step step1() {
            return stepBuilderFactory.get("step1")
                    .<Integer, Integer>chunk(2)
                    .reader(itemReader())
                    .writer(itemWriter())
                    .listener(promotionListener())
                    .build();
        }
    
        @Bean
        public Step step2() {
            return stepBuilderFactory.get("step2")
                    .tasklet((contribution, chunkContext) -> {
                        ExecutionContext executionContext = contribution.getStepExecution().getJobExecution().getExecutionContext();
                        List<Integer> items = (List<Integer>) executionContext.get("items");
                        System.out.println("Items read in step1:");
                        for (Integer item : items) {
                            System.out.println("item = " + item);
                        }
                        return RepeatStatus.FINISHED;
                    })
                    .build();
        }
    
        @Bean
        public ExecutionContextPromotionListener promotionListener() {
            ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
            listener.setKeys(new String[]{"items"});
            listener.setStrict(true);
            return listener;
        }
    
        @Bean
        public Job job() {
            return jobBuilderFactory.get("job")
                    .start(step1())
                    .next(step2())
                    .build();
        }
    
        public static void main(String[] args) throws Exception {
            ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
            JobLauncher jobLauncher = context.getBean(JobLauncher.class);
            Job job = context.getBean(Job.class);
            jobLauncher.run(job, new JobParameters());
        }
    
    }
    

    哪个打印:

    item = 1
    item = 2
    item = 3
    item = 4
    Items read in step1:
    item = 1
    item = 2
    item = 3
    item = 4
    

    编辑 2: 添加面向块的步骤的示例

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.function.Consumer;
    
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.annotation.BeforeStep;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
    import org.springframework.batch.item.ExecutionContext;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.support.ListItemReader;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @EnableBatchProcessing
    public class MyJob {
    
        private JobBuilderFactory jobBuilderFactory;
        private StepBuilderFactory stepBuilderFactory;
    
        public MyJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
            this.jobBuilderFactory = jobBuilderFactory;
            this.stepBuilderFactory = stepBuilderFactory;
        }
    
        @Bean
        public ItemReader<Integer> itemReader() {
            return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
        }
    
        @Bean
        public ItemWriter<Integer> itemWriter() {
            return new ItemWriter<Integer>() {
    
                private StepExecution stepExecution;
    
                @Override
                public void write(List<? extends Integer> items) {
                    List<Integer> itemsList = (List<Integer>) stepExecution.getExecutionContext().get("items");
                    for (Integer item : items) {
                        System.out.println("item = " + item);
                        itemsList.add(item);
                    }
                }
    
                @BeforeStep
                public void saveStepExecution(StepExecution stepExecution) {
                    this.stepExecution = stepExecution;
                    this.stepExecution.getExecutionContext().put("items", new ArrayList<>());
                }
            };
        }
    
        @Bean
        public Step step1() {
            return stepBuilderFactory.get("step1")
                    .<Integer, Integer>chunk(2)
                    .reader(itemReader())
                    .writer(itemWriter())
                    .listener(promotionListener())
                    .build();
        }
    
        @Bean
        public Step step2() {
            return stepBuilderFactory.get("step2")
                    .<Integer, Integer>chunk(2)
                    .reader(new ReadingObjectItemReader())
                    .writer(items -> items.forEach((Consumer<Integer>) integer -> System.out.println("integer = " + integer)))
                    .build();
        }
    
        @Bean
        public ExecutionContextPromotionListener promotionListener() {
            ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
            listener.setKeys(new String[]{"items"});
            listener.setStrict(true);
            return listener;
        }
    
        @Bean
        public Job job() {
            return jobBuilderFactory.get("job")
                    .start(step1())
                    .next(step2())
                    .build();
        }
    
        public static void main(String[] args) throws Exception {
            ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
            JobLauncher jobLauncher = context.getBean(JobLauncher.class);
            Job job = context.getBean(Job.class);
            jobLauncher.run(job, new JobParameters());
        }
    
        public static class ReadingObjectItemReader implements ItemReader<Integer> {
    
            int i = 0;
            private List<Integer> items;
    
            @Override
            public Integer read() {
                if (i >= items.size()) {
                    return null;
                } else {
                    return items.get(i++);
                }
            }
    
            @BeforeStep
            public void retrieveSharedData(StepExecution stepExecution) {
                JobExecution jobExecution = stepExecution.getJobExecution();
                ExecutionContext jobContext = jobExecution.getExecutionContext();
                items =  (List<Integer>) jobContext.get("items");
            }
        }
    
    }
    

    打印:

    item = 1
    item = 2
    item = 3
    item = 4
    integer = 1
    integer = 2
    integer = 3
    integer = 4
    

    这意味着该列表已在步骤 2 中从作业执行上下文中正确检索到,这是一个面向块的步骤。

    【讨论】:

    • 我根据您的建议编辑了我的代码。它仍然不起作用。我已经使用 itemWriter 方法编辑了帖子,其中包含您提到的更改。如果我错过了什么,请告诉我。谢谢
    • 您需要将promotionListener()方法的返回类型从Object更改为ExecutionContextPromotionListener。我用一个完整的例子相应地编辑了答案,如果有帮助请接受:stackoverflow.com/help/someone-answers。谢谢。
    • 您好,感谢您使用 Tasklets 方法。它工作正常,但同样使用块在第 2 步的 ItemReader 上抛出 null。我不确定为什么它不能使用块方法工作。你能告诉我在块方法上的代码出了什么问题吗?谢谢。
    • 我不知道您的示例有什么问题,因为我无法运行它。但是同样的方法也应该适用于面向块的步骤。我用第二个代码示例编辑了答案。
    猜你喜欢
    • 2015-07-29
    • 1970-01-01
    • 2023-03-31
    • 1970-01-01
    • 2011-03-15
    • 1970-01-01
    • 2019-05-24
    • 2012-07-23
    • 2014-06-10
    相关资源
    最近更新 更多