【问题标题】:How to run multiple jobs in spring batch using annotations如何使用注释在春季批处理中运行多个作业
【发布时间】:2016-03-03 12:06:13
【问题描述】:

我正在使用 Spring Boot + Spring Batch (annotation) ,遇到了我必须运行 2 个作业的场景。

我有需要使用春季批处理更新的员工和工资记录。我已经按照本教程 spring-batch getting started tutorial 为 Employee 和 Salary 对象配置了 BatchConiguration 类,分别命名为 BatchConfigurationEmployee 和 BatchConfigurationSalary。

我已经按照上面提到的教程定义了ItemReaderItemProcessorItemWriterJob

当我启动我的 Spring Boot 应用程序时,其中一个 Job 运行,我想同时运行 BatchConfigured 类。我怎样才能做到这一点

********* BatchConfigurationEmployee.java *************

@Configuration
@EnableBatchProcessing
public class BatchConfigurationEmployee {
    public ItemReader<employee> reader() {
        return new EmployeeItemReader();
    }

    @Bean
    public ItemProcessor<Employee, Employee> processor() {
        return new EmployeeItemProcessor();
    }

    @Bean   
    public Job Employee(JobBuilderFactory jobs, Step s1) {
        return jobs.get("Employee")
                .incrementer(new RunIdIncrementer())
                .flow(s1)
                .end()
                .build();
    }

    @Bean
    public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Employee> reader,
                    ItemProcessor<Employee, Employee> processor) {
        return stepBuilderFactory.get("step1")
                .<Employee, Employee> chunk(1)
                .reader(reader)
                .processor(processor)
                .build();
    }
}

工资等级来了

@Configuration
@EnableBatchProcessing
public class BatchConfigurationSalary {
    public ItemReader<Salary> reader() {
        return new SalaryItemReader();
    }

    @Bean
    public ItemProcessor<Salary, Salary> processor() {
        return new SalaryItemProcessor();
    }

    @Bean
    public Job salary(JobBuilderFactory jobs, Step s1) {
        return jobs.get("Salary")
                .incrementer(new RunIdIncrementer())
                .flow(s1)
                .end()
                .build();
    }

    @Bean
    public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Salary> reader,
                    ItemProcessor<Salary, Salary> processor) {
        return stepBuilderFactory.get("step1")
                .<Salary, Salary> chunk(1)
                .reader(reader)
                .processor(processor)
                .build();
    }
}

【问题讨论】:

    标签: java spring spring-boot spring-batch


    【解决方案1】:

    您的作业没有使用@Bean 注释,因此 spring-context 不知道它们。

    看看 JobLauncherCommandLineRunner 类。 SpringContext 中所有实现 Job 接口的 Bean 都会被注入。所有找到的作业都将被执行。 (这发生在 JobLauncherCommandLineRunner 的 executeLocalJobs 方法中)

    如果出于某种原因,您不想在上下文中将它们作为 bean,那么您必须在 jobregistry 中注册您的作业。( JobLauncherCommandLineRunner 的 execute registeredJobs 方法将负责启动已注册的作业)

    顺便说一句,你可以用属性控制

    spring.batch.job.names= # Comma-separated list of job names to execute on startup (For instance
     `job1,job2`). By default, all Jobs found in the context are executed.
    

    应该启动哪些作业。

    【讨论】:

    • 对不起,这是一个错字。我真正的问题是,当我作为 Spring Boot 应用程序运行时,只有一个类在运行
    • 始终是同一个作业,正在运行还是发生变化?
    • 是的,它总是运行 BatchConfigurationEmployee 类。
    • 好的,我明白了。我再写一个答案
    【解决方案2】:

    Bean 的名称在整个 Spring 上下文中必须是唯一的。

    在这两个作业中,您都使用相同的方法名来实例化读取器、写入器和处理器。 methodname 是用于在上下文中标识 bean 的名称。

    在这两个工作定义中,您都有 reader()、writer() 和 processor()。它们将相互覆盖。给他们唯一的名字,比如 readerEmployee()、readerSalary() 等等。

    这应该可以解决您的问题。

    【讨论】:

      【解决方案3】:

      我觉得这也是运行多个作业的好方法。

      我正在使用 Job Launcher 来配置和执行作业,并使用独立的 commandLineRunner 实现来运行它们。这些命令是为了确保它们按要求按顺序执行

      为这篇大文章道歉,但我想清楚地说明使用带有多个命令行运行器的 JobLauncher 配置可以实现什么

      这是我当前拥有的 BeanConfiguration

      @Configuration
      public class BeanConfiguration {
      
          @Autowired
          DataSource dataSource;
      
          @Autowired
          PlatformTransactionManager transactionManager;
      
          @Bean(name="jobOperator")
           public JobOperator jobOperator(JobExplorer jobExplorer,
      
                                          JobRegistry jobRegistry) throws Exception {
      
                  SimpleJobOperator jobOperator = new SimpleJobOperator();
      
                  jobOperator.setJobExplorer(jobExplorer);
                  jobOperator.setJobRepository(createJobRepository());
                  jobOperator.setJobRegistry(jobRegistry);
                  jobOperator.setJobLauncher(jobLauncher());
      
                  return jobOperator;
           }
      
          /**
           * Configure joblaucnher to set the execution to be done asycn
           * Using the ThreadPoolTaskExecutor
           * @return
           * @throws Exception
           */
          @Bean
          public JobLauncher jobLauncher() throws Exception {
                  SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
                  jobLauncher.setJobRepository(createJobRepository());
                  jobLauncher.setTaskExecutor(taskExecutor());
                  jobLauncher.afterPropertiesSet();
                  return jobLauncher;
          }
      
          // Read the datasource and set in the job repo
          protected JobRepository createJobRepository() throws Exception {
              JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
              factory.setDataSource(dataSource);
              factory.setTransactionManager(transactionManager);
              factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
              //factory.setTablePrefix("BATCH_");
              factory.setMaxVarCharLength(10000);
              return factory.getObject();
          }
      
          @Bean
          public RestTemplateBuilder restTemplateBuilder() {
           return new RestTemplateBuilder().additionalInterceptors(new CustomRestTemplateLoggerInterceptor());
          }
      
          @Bean(name=AppConstants.JOB_DECIDER_BEAN_NAME_EMAIL_INIT)
          public JobExecutionDecider jobDecider() {
              return new EmailInitJobExecutionDecider();
          }
      
          @Bean
          public ThreadPoolTaskExecutor taskExecutor() {
          ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
          taskExecutor.setCorePoolSize(15);
          taskExecutor.setMaxPoolSize(20);
          taskExecutor.setQueueCapacity(30);
          return taskExecutor;
      }
      }
      

      我已设置数据库以在 postgre 中保存作业执行详细信息,因此 DatabaseConfiguration 看起来像这样(两个不同的 bean 用于两个不同的配置文件 -env)

      @配置 公共类 DatasourceConfiguration 实现 EnvironmentAware{

      private Environment env;
      
      @Bean
      @Qualifier(AppConstants.DB_BEAN)
      @Profile("dev")
      public DataSource getDataSource() {
          HikariDataSource ds = new HikariDataSource();
      
          boolean isAutoCommitEnabled = env.getProperty("spring.datasource.hikari.auto-commit") != null ? Boolean.parseBoolean(env.getProperty("spring.datasource.hikari.auto-commit")):false;
          ds.setAutoCommit(isAutoCommitEnabled);
          // Connection test query is for legacy connections
          //ds.setConnectionInitSql(env.getProperty("spring.datasource.hikari.connection-test-query"));
          ds.setPoolName(env.getProperty("spring.datasource.hikari.pool-name"));
          ds.setDriverClassName(env.getProperty("spring.datasource.driver-class-name"));
          long timeout = env.getProperty("spring.datasource.hikari.idleTimeout") != null ? Long.parseLong(env.getProperty("spring.datasource.hikari.idleTimeout")): 40000;
          ds.setIdleTimeout(timeout);
          long maxLifeTime = env.getProperty("spring.datasource.hikari.maxLifetime") != null ? Long.parseLong(env.getProperty("spring.datasource.hikari.maxLifetime")): 1800000 ;
          ds.setMaxLifetime(maxLifeTime);
          ds.setJdbcUrl(env.getProperty("spring.datasource.url"));
          ds.setPoolName(env.getProperty("spring.datasource.hikari.pool-name"));
          ds.setUsername(env.getProperty("spring.datasource.username"));
          ds.setPassword(env.getProperty("spring.datasource.password"));
          int poolSize = env.getProperty("spring.datasource.hikari.maximum-pool-size") != null ? Integer.parseInt(env.getProperty("spring.datasource.hikari.maximum-pool-size")): 10;
          ds.setMaximumPoolSize(poolSize);
      
          return ds;
      }
      
      @Bean
      @Qualifier(AppConstants.DB_PROD_BEAN)
      @Profile("prod")
      
      public DataSource getProdDatabase() {
          HikariDataSource ds = new HikariDataSource();
      
          boolean isAutoCommitEnabled = env.getProperty("spring.datasource.hikari.auto-commit") != null ? Boolean.parseBoolean(env.getProperty("spring.datasource.hikari.auto-commit")):false;
          ds.setAutoCommit(isAutoCommitEnabled);
          // Connection test query is for legacy connections
          //ds.setConnectionInitSql(env.getProperty("spring.datasource.hikari.connection-test-query"));
          ds.setPoolName(env.getProperty("spring.datasource.hikari.pool-name"));
          ds.setDriverClassName(env.getProperty("spring.datasource.driver-class-name"));
          long timeout = env.getProperty("spring.datasource.hikari.idleTimeout") != null ? Long.parseLong(env.getProperty("spring.datasource.hikari.idleTimeout")): 40000;
          ds.setIdleTimeout(timeout);
          long maxLifeTime = env.getProperty("spring.datasource.hikari.maxLifetime") != null ? Long.parseLong(env.getProperty("spring.datasource.hikari.maxLifetime")): 1800000 ;
          ds.setMaxLifetime(maxLifeTime);
          ds.setJdbcUrl(env.getProperty("spring.datasource.url"));
          ds.setPoolName(env.getProperty("spring.datasource.hikari.pool-name"));
          ds.setUsername(env.getProperty("spring.datasource.username"));
          ds.setPassword(env.getProperty("spring.datasource.password"));
          int poolSize = env.getProperty("spring.datasource.hikari.maximum-pool-size") != null ? Integer.parseInt(env.getProperty("spring.datasource.hikari.maximum-pool-size")): 10;
          ds.setMaximumPoolSize(poolSize);
      
          return ds;
      }
      
      public void setEnvironment(Environment environment) {
          // TODO Auto-generated method stub
          this.env = environment;
      }
      

      }

      确保初始应用程序启动器捕获应用程序执行,一旦作业执行终止(失败或完成)将返回该应用程序执行,以便您可以正常关闭 jvm。否则,即使在所有作业完成后,使用 joblauncher 也会使 jvm 处于活动状态

      @SpringBootApplication
      @ComponentScan(basePackages="com.XXXX.Feedback_File_Processing.*")
      @EnableBatchProcessing
      public class FeedbackFileProcessingApp 
      {
          public static void main(String[] args) throws Exception {
              ApplicationContext appContext = SpringApplication.run(FeedbackFileProcessingApp.class, args);
              // The batch job has finished by this point because the 
              //   ApplicationContext is not 'ready' until the job is finished
              // Also, use System.exit to force the Java process to finish with the exit code returned from the Spring App
              System.exit(SpringApplication.exit(appContext));
          }
      
      }
      

      ............. 依此类推,您可以配置您自己的决策者,您自己的工作/步骤,如您在上面所说的两种不同的配置,如下所示,并在命令行运行器中单独使用它们(因为帖子越来越大,我只提供工作和命令行运行器的详细信息)

      这是两个工作

      @Configuration
      public class DefferalJobConfiguration {
      
          @Autowired
          JobLauncher joblauncher;
      
          @Autowired
          private JobBuilderFactory jobFactory;
      
          @Autowired
          private StepBuilderFactory stepFactory;
      
          @Bean
          @StepScope
          public Tasklet newSampleTasklet() {
              return ((stepExecution, chunkContext) -> {
                  System.out.println("execution of step after flow");
                  return RepeatStatus.FINISHED;
              });
          }
      
          @Bean
          public Step sampleStep() {
              return stepFactory.get("sampleStep").listener(new CustomStepExecutionListener())
                      .tasklet(newSampleTasklet()).build();
          }
      
          @Autowired
          @Qualifier(AppConstants.FLOW_BEAN_NAME_EMAIL_INITIATION)
          private Flow emailInitFlow;
      
          @Autowired
          @Qualifier(AppConstants.JOB_DECIDER_BEAN_NAME_EMAIL_INIT)
          private JobExecutionDecider jobDecider;
      
          @Autowired
          @Qualifier(AppConstants.STEP_BEAN_NAME_ITEMREADER_FETCH_DEFERRAL_CONFIG)
          private Step deferralConfigStep;
      
          @Bean(name=AppConstants.JOB_BEAN_NAME_DEFERRAL)
          public Job deferralJob() {
              return jobFactory.get(AppConstants.JOB_NAME_DEFERRAL)
                      .start(emailInitFlow)
                      .on("COMPLETED").to(sampleStep())
                      .next(jobDecider).on("COMPLETED").to(deferralConfigStep)
                      .on("FAILED").fail()
                      .end().build();
      
      
          }
      }
      
      
      
      @Configuration
      public class TestFlowJobConfiguration {
      
          @Autowired
          private JobBuilderFactory jobFactory;
      
          @Autowired
          @Qualifier("testFlow")
          private Flow testFlow;
      
          @Bean(name = "testFlowJob")
          public Job testFlowJob() {
      
              return jobFactory.get("testFlowJob").start(testFlow).end().build();
          }
      }
      

      这里是命令行运行器(我确保第一个作业在第二个作业初始化之前完成,但完全取决于用户按照不同的策略并行执行它们)

      @Component
      @Order(1)
      public class DeferralCommandLineRunner implements CommandLineRunner, EnvironmentAware{
          // If the jobLauncher is not used, then by default jobs are launched using SimpleJobLauncher
          //  with default configuration(assumption)
          // hence modified the jobLauncher with vales set in BeanConfig
          // of spring batch
          private Environment env;
      
          @Autowired
          JobLauncher jobLauncher;
      
          @Autowired
          @Qualifier(AppConstants.JOB_BEAN_NAME_DEFERRAL)
          Job deferralJob;
      
          @Override
          public void run(String... args) throws Exception {
              // TODO Auto-generated method stub
              JobParameters jobparams = new JobParametersBuilder()
                      .addString("run.time", LocalDateTime.now().
                              format(DateTimeFormatter.ofPattern(AppConstants.JOB_DATE_FORMATTER_PATTERN)).toString())
                      .addString("instance.name", 
                              (deferralJob.getName() != null) ?deferralJob.getName()+'-'+UUID.randomUUID().toString() :
                                  UUID.randomUUID().toString())
                      .toJobParameters();
              jobLauncher.run(deferralJob, jobparams);
          }
      
          @Override
          public void setEnvironment(Environment environment) {
              // TODO Auto-generated method stub
              this.env = environment;
          }
      
      }
      
      
      
      @Component
      @Order(2)
      public class TestJobCommandLineRunner implements CommandLineRunner {
      
          @Autowired
          JobLauncher jobLauncher;
      
          @Autowired
          @Qualifier("testFlowJob")
          Job testjob;
      
          @Autowired
          @Qualifier("jobOperator")
          JobOperator operator;
      
          @Override
          public void run(String... args) throws Exception {
              // TODO Auto-generated method stub
              JobParameters jobParam = new JobParametersBuilder().addString("name", UUID.randomUUID().toString())
                      .toJobParameters();
              System.out.println(operator.getJobNames());
              try {
                  Set<Long> deferralExecutionIds = operator.getRunningExecutions(AppConstants.JOB_NAME_DEFERRAL);
                  System.out.println("deferralExceutuibuds:" + deferralExecutionIds);
      
                  operator.stop(deferralExecutionIds.iterator().next());
      
              } catch (NoSuchJobException | NoSuchJobExecutionException | JobExecutionNotRunningException e) {
                  // just add a logging here
                  System.out.println("exception caught:" + e.getMessage());
              }
              jobLauncher.run(testjob, jobParam);
          }
      
      }
      

      希望这能让您全面了解如何做到这一点。我正在使用 spring-boot-starter-batch:jar:2.0.0.RELEASE

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-01-28
        • 2014-12-21
        • 2021-12-17
        • 1970-01-01
        相关资源
        最近更新 更多