【发布时间】:2021-09-21 09:45:12
【问题描述】:
我需要你的帮助!!
我有一个 spring 批处理应用程序,我需要将作业参数传递给我的石英作业。 这是我的 JobLauncher,我想在其中将列表的每一行作为作业参数传递:
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
try {
JdbcTemplate Jd = new JdbcTemplate(ds);
List<QuartzParameters> list=null;
list=Jd.query("select * from FLUX_INFO",new QuartzParametersMapper());
for (QuartzParameters i : list) {
Job job = jobLocator.getJob(jobName);
JobParametersBuilder JP = new JobParametersBuilder();
JP.addString("PAYS", i.getPAYS());
JP.addString("CRON", i.getCRON());
JP.addString("CLASSAPP",i.getCLASSAPP());
JobParameters paramJobParameters=JP.toJobParameters();
JobExecution jobExecution = jobLauncher.run(job, paramJobParameters);
log.info("{}_{} was completed successfully", job.getName(), jobExecution.getId());
} } catch (Exception e) {
log.error("Encountered job execution exception!");
}}}
这是我的批处理配置类:
@Value("#{jobParameters[CLASSAPP]}")
private String ClassApp;
@Scope("step")
@Bean
public JdbcCursorItemReader<FichierEclate> readerDB(){
JdbcCursorItemReader<FichierEclate> reader = new JdbcCursorItemReader<FichierEclate>();
reader.setDataSource(ds);
reader.setSql(query(ClassApp));
reader.setRowMapper(new FichierEclateRowMapper());
return reader;
}
我还在 Quartz 配置中使用另一个参数:
@Value("#{jobParameters[CRON]}")
private String CRON_EXPRESSION;
@StepScope
@Bean
public JobDetailFactoryBean jobDetailFactoryBean() {
JobDetailFactoryBean factory = new JobDetailFactoryBean();
factory.setJobClass(QuartzJobLauncher.class);
Map map = new HashMap();
map.put("jobName", "JobFinal");
map.put("jobLauncher", jobLauncher);
map.put("jobLocator", jobLocator);
map.put("CRON_EXPRESSION", CRON_EXPRESSION );
factory.setJobDataAsMap(map);
factory.setGroup("etl_group");
factory.setName("etl_job");
return factory;}
我也在处理器类中使用job参数:
@Value("#{jobParameters[CLASSAPP]}")
private String ClassApp;
@Scope("step")
@Override
public TdfFile process(FichierEclate item) throws Exception {
//some code and use the **CLASSAPP** variable }
我尝试使用@StepScope 但同样的问题!我明白了:
Error creating bean with name 'batchConfig': Unsatisfied dependency expressed through field 'ClassApp'; nested exception is org.springframework.beans.factory.BeanExpressionException: Expression parsing failed; nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'jobParameters' cannot be found on object of type 'org.springframework.beans.factory.config.BeanExpressionContext' - maybe not public or not valid?
这里是 QuartzConfiguration 类:
@Configuration
public class QuartzConfiguration {
@Value("#{jobParameters[CRON]}")
private String CRON_EXPRESSION;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobLocator jobLocator;
public QuartzConfiguration() {
super();
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);
return jobRegistryBeanPostProcessor;
}
@Bean
@Scope(value="step", proxyMode=ScopedProxyMode.TARGET_CLASS)
public JobDetailFactoryBean jobDetailFactoryBean() {
JobDetailFactoryBean factory = new JobDetailFactoryBean();
factory.setJobClass(QuartzJobLauncher.class);
Map map = new HashMap();
map.put("jobName", "JobFinal");
map.put("jobLauncher", jobLauncher);
map.put("jobLocator", jobLocator);
map.put("CRON_EXPRESSION", CRON_EXPRESSION );
factory.setJobDataAsMap(map);
factory.setGroup("etl_group");
factory.setName("etl_job");
return factory;
}
// Job is scheduled after every 3 minutes
@Bean
public CronTriggerFactoryBean cronTriggerFactoryBean() {
CronTriggerFactoryBean stFactory = new CronTriggerFactoryBean();
stFactory.setJobDetail(jobDetailFactoryBean().getObject());
stFactory.setStartDelay(3000);
stFactory.setName("cron_trigger");
stFactory.setGroup("cron_group");
stFactory.setCronExpression(CRON_EXPRESSION);
// stFactory.getJobDataMap().get(app);
return stFactory;
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean() {
SchedulerFactoryBean scheduler = new SchedulerFactoryBean();
scheduler.setTriggers(cronTriggerFactoryBean().getObject());
return scheduler;
}}
这里是 jobLauncher 类:
public class QuartzJobLauncher extends QuartzJobBean {
private static final Logger log = LoggerFactory.getLogger(QuartzJobLauncher.class);
@Autowired
private DataSource ds;
private String jobName;
private JobLauncher jobLauncher;
private JobLocator jobLocator;
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public JobLauncher getJobLauncher() {
return jobLauncher;
}
public void setJobLauncher(JobLauncher jobLauncher) {
this.jobLauncher = jobLauncher;
}
public JobLocator getJobLocator() {
return jobLocator;
}
public void setJobLocator(JobLocator jobLocator) {
this.jobLocator = jobLocator;
}
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
try {
JdbcTemplate Jd = new JdbcTemplate(ds);
List<QuartzParameters> list=null;
list=Jd.query("select * from FLUX_INFO",new QuartzParametersMapper());
for (QuartzParameters i : list) {
Job job = jobLocator.getJob(jobName);
JobParametersBuilder JP = new JobParametersBuilder();
JP.addString("PAYS", i.getPAYS());
JP.addString("CRON", i.getCRON());
JP.addString("CLASSAPP",i.getCLASSAPP());
JobParameters paramJobParameters=JP.toJobParameters();
final JobExecution jobExecution = jobLauncher.run(job, paramJobParameters);
log.info("{}_{} was completed successfully", job.getName(), jobExecution.getId());
}} catch (Exception e) {
log.error("Encountered job execution exception!");
}}}
我尝试直接在 classes 或 Beans 上添加 @Scope(value="step", proxyMode=ScopedProxyMode.TARGET_CLASS) ! !两种方法都不起作用! 有没有人尝试过解决这个问题的解决方案?!
【问题讨论】:
标签: spring spring-batch parameter-passing batch-processing