【问题标题】:Spring Batch Parallel Processing - File Splitting at runtimeSpring Batch Parallel Processing - 运行时文件拆分
【发布时间】:2018-08-07 17:57:33
【问题描述】:

我有一个简单的 Spring Batch,它从文件中读取 100 万条记录并将其打印在控制台上。

现在,我想在 N 个服务器上部署这批,比如 N=5。

如何确保所有服务器实例没有读取相同的记录?

如 - 我如何适当地拆分文件中的记录 (100 万 / 5) 以实现优化结果?

请提供代码示例。 谢谢。

【问题讨论】:

  • 您是要读取单个文件还是拆分文件。我建议拆分文件...
  • 从一个非常大的文件中读取。
  • 如果我继续拆分文件 - 是否有任何 Spring 批处理功能可以自动为我完成。如-我的批处理服务器的第二个实例如何知道要选择哪个拆分文件?如果没有 Spring 批处理功能 .. 那么我可以将文件拆分并传递到一个开箱即用的项目中。
  • 没有。 Spring Batch 不支持拆分文件。无论如何,有一些操作系统级别的工具通常可以做得更好。 Spring Batch 可以通过 SystemCommandTasklet... 运行其中之一
  • 那么,由哪个批处理服务器实例选择哪个拆分文件 - 我需要接受这个吗? .. 喜欢传递参数?

标签: spring spring-batch


【解决方案1】:

按照 Michael 的建议,您可以使用系统命令拆分文件,然后使用 MultiResourcePartitioner 并行处理拆分的文件。 我就是这样做的

@Bean
    public Partitioner partitioner() {
        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
        ClassLoader cl = this.getClass().getClassLoader();
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(cl);
        Resource[] resources = resolver.getResources("file:" + filePath + "/"+"*.csv");     
        partitioner.setResources(resources);
        partitioner.partition(10);      
        return partitioner;
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(4);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }   

    @Bean
    @Qualifier("masterStep")
    public Step masterStep() {
        return stepBuilderFactory.get("masterStep")
                .partitioner(ProcessDataStep())
                .partitioner("ProcessDataStep",partitioner())   
                .taskExecutor(taskExecutor())
                .listener(pcStressStepListener)
                .build();
    }


    @Bean
    @Qualifier("processData")
    public Step processData() {
        return stepBuilderFactory.get("processData")
                .<pojo, pojo> chunk(5000)
                .reader(reader)             
                .processor(processor())
                .writer(writer)         
                .build();
    }



    @Bean(name="reader")
    @StepScope
    public FlatFileItemReader<pojo> reader(@Value("#{stepExecutionContext['fileName']}") String filename) {

        FlatFileItemReader<pojo> reader = new FlatFileItemReader<>();
        reader.setResource(new UrlResource(filename));
        reader.setLineMapper(new DefaultLineMapper<pojo>() {
            {
                setLineTokenizer(new DelimitedLineTokenizer() {
                    {
                        setNames(FILE HEADER);


                    }
                });
                setFieldSetMapper(new BeanWrapperFieldSetMapper<pojo>() {
                    {
                        setTargetType(pojo.class);
                    }
                });
            }
        });
        return reader;
    }   

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-06-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-31
    • 2015-02-21
    相关资源
    最近更新 更多