【问题标题】:Spring batch run multiple jobs in parallelSpring批处理并行运行多个作业
【发布时间】:2019-06-27 12:30:14
【问题描述】:

我是 Spring 批处理的新手,不知道该怎么做..

基本上我有一个 spring 批处理文件,两者都必须并行运行,即当我请求 execute_job1 时,BatchConfig1 必须执行,当我请求 execute_job2 时,BatchConfig2 必须执行。我该怎么做?

控制器

@RestController
public class JobExecutionController {

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job job;

    /**
     * 
     * @return
     */
    @RequestMapping("/execute_job1")
    @ResponseBody
    public void executeBatchJob1() {

    }

    /**
     * 
     * @return
     */
    @RequestMapping("/execute_job2")
    @ResponseBody
    public void executeBatchJob2() {

    }

}

批量配置1

@Configuration
@EnableBatchProcessing
public class BatchConfig {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public Step stepOne(){
        return steps.get("stepOne")
                .tasklet(new MyTaskOne())
                .build();
    }

    @Bean
    public Step stepTwo(){
        return steps.get("stepTwo")
                .tasklet(new MyTaskTwo())
                .build();
    }  

    @Bean
    public Job demoJob(){
        return jobs.get("exportUserJob1")
                .incrementer(new RunIdIncrementer())
                .start(stepOne())
                .next(stepTwo())
                .build();
    }
}

批量配置2:

@Configuration
@EnableBatchProcessing
public class BatchConfig2 {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public DataSource dataSource;


    @Bean
    public JdbcCursorItemReader<User> reader() {
        JdbcCursorItemReader<User> reader = new JdbcCursorItemReader<User>();
        reader.setDataSource(dataSource);
        reader.setSql("SELECT id,name FROM user");
        reader.setRowMapper(new UserRowMapper());
        return reader;
    }

    public class UserRowMapper implements RowMapper<User> {

        @Override
        public User mapRow(ResultSet rs, int rowNum) throws SQLException {
            User user = new User();
            user.setId(rs.getInt("id"));
            user.setName(rs.getString("name"));

            return user;
        }

    }

    @Bean
    public UserItemProcessor processor() {
        return new UserItemProcessor();
    }

    @Bean
    public FlatFileItemWriter<User> writer() {
        FlatFileItemWriter<User> writer = new FlatFileItemWriter<User>();
        writer.setResource(new ClassPathResource("users.csv"));
        writer.setLineAggregator(new DelimitedLineAggregator<User>() {
            {
                setDelimiter(",");
                setFieldExtractor(new BeanWrapperFieldExtractor<User>() {
                    {
                        setNames(new String[] { "id", "name" });
                    }
                });
            }
        });

        return writer;
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1").<User, User>chunk(10).reader(reader()).processor(processor())
                .writer(writer()).build();
    }

    @Bean
    public Job exportUserJob() {
        return jobBuilderFactory.get("exportUserJob2").incrementer(new RunIdIncrementer()).flow(step1()).end().build();
    }
}

【问题讨论】:

    标签: spring spring-boot spring-batch


    【解决方案1】:

    您可以使用 JobLauncher 运行作业。控制器代码:

    @RestController
    public class JobExecutionController {
    
    public JobExecutionController(JobLauncher jobLauncher, 
                                  @Qualifier("demoJob") Job demoJob, 
                                  @Qualifier("exportUserJob") Job exportUserJob) {
        this.jobLauncher = jobLauncher;
        this.demoJob = demoJob;
        this.exportUserJob = exportUserJob;
    }
    
    JobLauncher jobLauncher;
    
    Job demoJob;
    
    Job exportUserJob;
    
    
    @RequestMapping("/execute_job1")
    @ResponseBody
    public void executeBatchJob1() {
        try {
            JobExecution jobExecution = jobLauncher.run(demoJob, new JobParameters(generateJobParameter()));
            log.info("Job started in thread :" + jobExecution.getJobParameters().getString("JobThread"));
        } catch (JobExecutionAlreadyRunningException | JobRestartException | JobParametersInvalidException | JobInstanceAlreadyCompleteException e) {
            log.error("Something sent wrong during job execution", e);
        }
    
    }
    
    @RequestMapping("/execute_job2")
    @ResponseBody
    public void executeBatchJob2() {
        try {
            JobExecution jobExecution = jobLauncher.run(exportUserJob, new JobParameters(generateJobParameter()));
            log.info("Job started in thread :" + jobExecution.getJobParameters().getString("JobThread"));
        } catch (JobExecutionAlreadyRunningException | JobRestartException | JobParametersInvalidException | JobInstanceAlreadyCompleteException e) {
            log.error("Something sent wrong during job execution", e);
        }
    
    }
    
    private JobParameters generateJobParameter() {
        Map<String, JobParameter> parameters = new HashMap<>();
        parameters.put("Job start time", new JobParameter(Instant.now().toEpochMilli()));
        parameters.put("JobThread", new JobParameter(Thread.currentThread().getId()));
    
        return new JobParameters(parameters);
    }
    }
    

    为了防止在应用程序启动时启动你的作业,添加到 application.properties 下一个 spring 批处理配置:spring.batch.job.enabled=false

    【讨论】:

    • 让我试试,结果会告诉你
    • com.example.demo.contoller.JobExecutionController 中的字段作业需要一个 bean,但找到了 2 个: - demoJob:由类路径资源中的方法“demoJob”定义 [com/example/demo/ config/BatchConfig.class] - exportUserJob:由类路径资源 [com/example/demo/config/BatchConfig2.class] 中的方法 'exportUserJob' 定义
    • 啊,我用一个 Job bean 进行了测试。我编辑了代码。 @Qualifier 必须在构造函数中。
    猜你喜欢
    • 2015-07-30
    • 2014-09-28
    • 2021-01-15
    • 2013-10-21
    • 2014-06-20
    • 1970-01-01
    • 2021-09-17
    • 1970-01-01
    • 2017-11-03
    相关资源
    最近更新 更多