【发布时间】:2016-05-18 21:48:28
【问题描述】:
简介
我正在尝试使用在 tasklet 中创建的作业参数来创建 tasklet 执行后的步骤。
一个 tasklet 尝试查找一些文件 (findFiles()),如果它找到了一些文件,它会将文件名保存到一个字符串列表中。
在 tasklet 中,我按如下方式传递数据:
chunkContext.getStepContext().getStepExecution().getExecutionContext().put("files", fileNames);
下一步是一个并行流程,每个文件都会执行一个简单的读取器-处理器-写入器步骤(如果您对我如何到达那里感兴趣,请查看我之前的问题:Spring Batch - Looping a reader/processor/writer step)
在构建作业 readFilesJob() 时,最初会使用“假”文件列表创建流程,因为只有在 tasklet 执行后才能知道真实的文件列表。
问题
如何配置我的作业,以便先执行 tasklet,然后使用从 tasklet 生成的文件列表执行并行流程?
我认为归根结底是在运行时的正确时刻用正确的数据加载文件名列表......但是如何?
复制
这是我的简化配置:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
private static final String FLOW_NAME = "flow1";
private static final String PLACE_HOLDER = "empty";
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
public List<String> files = Arrays.asList(PLACE_HOLDER);
@Bean
public Job readFilesJob() throws Exception {
List<Step> steps = files.stream().map(file -> createStep(file)).collect(Collectors.toList());
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);
Flow flow = flowBuilder
.start(findFiles())
.next(createParallelFlow(steps))
.build();
return jobBuilderFactory.get("readFilesJob")
.start(flow)
.end()
.build();
}
private static Flow createParallelFlow(List<Step> steps){
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(steps.size());
List<Flow> flows = steps.stream()
.map(step ->
new FlowBuilder<Flow>("flow_" + step.getName())
.start(step)
.build())
.collect(Collectors.toList());
return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor)
.add(flows.toArray(new Flow[flows.size()]))
.build();
}
private Step createStep(String fileName){
return stepBuilderFactory.get("readFile" + fileName)
.chunk(100)
.reader(reader(fileName))
.writer(writer(filename))
.build();
}
private FileFinder findFiles(){
return new FileFinder();
}
}
研究
How to safely pass params from Tasklet to step when running parallel jobs 的问题和回答建议在阅读器/作者中使用这样的结构:
@Value("#{jobExecutionContext[filePath]}") String filePath
但是,由于在 createParallelFlow() 方法中创建步骤的方式,我真的希望可以将文件名作为字符串传递给读取器/写入器。因此,即使该问题的答案可能是我的问题的解决方案,它也不是所需的解决方案。但是,如果我错了,请不要纠正我。
结束
我正在使用文件名示例来更好地阐明问题。我的问题实际上不是从目录中读取多个文件。我的问题实际上归结为在运行时生成数据并将其传递给下一个动态生成的步骤。
编辑:
添加了 fileFinder 的简化 tasklet。
@Component
public class FileFinder implements Tasklet, InitializingBean {
List<String> fileNames;
public List<String> getFileNames() {
return fileNames;
}
@PostConstruct
public void afterPropertiesSet() {
// read the filenames and store dem in the list
fileNames.add("sample-data1.csv");
fileNames.add("sample-data2.csv");
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
// Execution of methods that will find the file names and put them in the list...
chunkContext.getStepContext().getStepExecution().getExecutionContext().put("files", fileNames);
return RepeatStatus.FINISHED;
}
}
【问题讨论】:
标签: java spring-batch jobs spring-java-config late-binding