【问题标题】:Spring Batch - How to generate parallel steps based on params created in a previous stepSpring Batch - 如何根据上一步中创建的参数生成并行步骤
【发布时间】:2016-05-18 21:48:28
【问题描述】:

简介

我正在尝试使用在 tasklet 中创建的作业参数来创建 tasklet 执行后的步骤。

一个 tasklet 尝试查找一些文件 (findFiles()),如果它找到了一些文件,它会将文件名保存到一个字符串列表中。

在 tasklet 中,我按如下方式传递数据: chunkContext.getStepContext().getStepExecution().getExecutionContext().put("files", fileNames);

下一步是一个并行流程,每个文件都会执行一个简单的读取器-处理器-写入器步骤(如果您对我如何到达那里感兴趣,请查看我之前的问题:Spring Batch - Looping a reader/processor/writer step

在构建作业 readFilesJob() 时,最初会使用“假”文件列表创建流程,因为只有在 tasklet 执行后才能知道真实的文件列表。

问题

如何配置我的作业,以便先执行 tasklet,然后使用从 tasklet 生成的文件列表执行并行流程?

我认为归根结底是在运行时的正确时刻用正确的数据加载文件名列表......但是如何?

复制

这是我的简化配置:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    private static final String FLOW_NAME = "flow1";
    private static final String PLACE_HOLDER = "empty";

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    public List<String> files = Arrays.asList(PLACE_HOLDER);

    @Bean
    public Job readFilesJob() throws Exception {   
        List<Step> steps = files.stream().map(file -> createStep(file)).collect(Collectors.toList());

        FlowBuilder<Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);

        Flow flow = flowBuilder
                .start(findFiles())             
                .next(createParallelFlow(steps))
                .build();       

        return jobBuilderFactory.get("readFilesJob")                
                .start(flow)                
                .end()
                .build();
    }

    private static Flow createParallelFlow(List<Step> steps){
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(steps.size());

        List<Flow> flows = steps.stream()
                .map(step ->
                        new FlowBuilder<Flow>("flow_" + step.getName()) 
                        .start(step) 
                        .build()) 
                .collect(Collectors.toList());

        return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor) 
             .add(flows.toArray(new Flow[flows.size()]))
             .build();      
    }

    private Step createStep(String fileName){
        return stepBuilderFactory.get("readFile" + fileName)
                .chunk(100)
                .reader(reader(fileName))               
                .writer(writer(filename))                               
                .build();
    }

    private FileFinder findFiles(){
        return new FileFinder();
    }
}

研究

How to safely pass params from Tasklet to step when running parallel jobs 的问题和回答建议在阅读器/作者中使用这样的结构:

@Value("#{jobExecutionContext[filePath]}") String filePath

但是,由于在 createParallelFlow() 方法中创建步骤的方式,我真的希望可以将文件名作为字符串传递给读取器/写入器。因此,即使该问题的答案可能是我的问题的解决方案,它也不是所需的解决方案。但是,如果我错了,请不要纠正我。

结束

我正在使用文件名示例来更好地阐明问题。我的问题实际上不是从目录中读取多个文件。我的问题实际上归结为在运行时生成数据并将其传递给下一个动态生成的步骤。

编辑:

添加了 fileFinder 的简化 tasklet。

@Component
public class FileFinder implements Tasklet, InitializingBean {

    List<String> fileNames;

    public List<String> getFileNames() {
        return fileNames;
    }

    @PostConstruct
    public void afterPropertiesSet() {
        // read the filenames and store dem in the list
        fileNames.add("sample-data1.csv");
        fileNames.add("sample-data2.csv");
    }

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        // Execution of methods that will find the file names and put them in the list...
        chunkContext.getStepContext().getStepExecution().getExecutionContext().put("files", fileNames);                     
        return RepeatStatus.FINISHED;
    }    
}

【问题讨论】:

    标签: java spring-batch jobs spring-java-config late-binding


    【解决方案1】:

    我不确定,如果我确实正确理解了您的问题,但据我所知,您需要在动态构建工作之前拥有包含文件名的列表。

    你可以这样做:

    @Component
    public class MyJobSetup {
        List<String> fileNames;
    
        public List<String> getFileNames() {
            return fileNames;
        }
    
        @PostConstruct
        public void afterPropertiesSet() {
            // read the filenames and store dem in the list
            fileNames = ....;
        }
    }
    

    之后,您可以将这个 Bean 注入到您的 JobConfiguration Bean 中

    @Configuration
    @EnableBatchProcessing
    @Import(MyJobSetup.class)
    public class BatchConfiguration {
    
        private static final String FLOW_NAME = "flow1";
        private static final String PLACE_HOLDER = "empty";
    
        @Autowired
        private  MyJobSetup jobSetup; // <--- Inject
              // PostConstruct of MyJobSetup was executed, when it is injected
    
        @Autowired
        public JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        public StepBuilderFactory stepBuilderFactory;
    
        public List<String> files = Arrays.asList(PLACE_HOLDER);
    
        @Bean
        public Job readFilesJob() throws Exception {   
            List<Step> steps = jobSetUp.getFileNames() // get the list of files
                 .stream() // as stream
                 .map(file -> createStep(file)) // map...
                 .collect(Collectors.toList()); // and create the list of steps
    

    【讨论】:

    • 再次感谢您的快速回答和帮助,非常有帮助。 fileFinder 实际上是一个 tasklet,它执行一些方法,当它完成并找到文件时,它会将文件名保存在一个列表中。在我的实际应用程序中,创建了一些需要传递给多个读写器的动态 SQL。因此,现在配置的 fileFinder 是工作的一部分。我已经用一个与我的实际 SQL 创建者小任务非常相似的示例小任务编辑了这个问题。我希望这能澄清我的问题。这样可以吗?
    • 我看不出您为什么要将 FileFinder 放入作业本身(作为 tasklet),但这可能是因为我不完全理解您需要解决的问题。当您在“BatchConfiguration”类中构建作业时,您应该拥有完全构建作业所需的所有信息。您应该确切地知道,您将有多少步骤,所有 FileReader 的文件名是什么,您的 JDBCItemWriter 的 SQL 的外观等。即使作业是动态构建的并且取决于例如文件的数量在运行时存在于目录中。
    • 应用程序生成 SQL 语句,然后使用这些 SQL 提取数据。我最初的想法是把它变成一个可以启动的工作。我使用 JobExecutionDecider 让它工作,但不幸的是,它只是在循环中执行 reader-proc-writer 并且对于许多 SQL 来说是不可扩展的。也许 BatchConfiguration 类应该包含两个单独的作业。首先是 generateSQL 作业,然后是 extractData 作业。在第一个作业执行后,所有信息都会出现以动态生成第二个作业。您对这种方法有何看法?
    • 为什么要把SQL语句的生成放到一个job里面呢?如上所述,您可以在构建工作之前执行此操作。然后,当您创建/计算 SQL 语句时,您可以动态创建作业并为每个计算的 SQL 添加一个步骤,我的意思是,您在 springcontext 中,所有实例化的 bean 都已准备好并可以注入您的 BatchConfiguration 类. Job 中是否有任何功能可以使 SQL 语句的生成比在纯 Spring bean 中生成要容易得多?
    • 您不必以不同的方式启动应用程序。为了使您的 Tasklet 成为“正常”的 Spring bean,只需在“implements”之后删除“Tasklet”即可。然后在您的 BatchConfiguration 类中使用“@Autowired”注入它。根据您的上下文的配置方式,您可能必须添加一个“@Import”注释(参见我上面编辑的示例)。而已。 Spring 上下文负责首先执行您的 SQL 生成类(以前称为 FileFinder-Tasklet),并在将其注入 BatchConfiguration 类之前调用​​其 PostConstruct 方法。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-08-30
    • 2018-06-20
    • 1970-01-01
    • 2016-05-14
    • 2018-09-27
    • 2019-12-17
    • 1970-01-01
    相关资源
    最近更新 更多