【问题标题】:Spring batch : Assemble a job rather than configuring it (Extensible job configuration)Spring批处理:组装作业而不是配置作业(可扩展作业配置)
【发布时间】:2016-06-06 11:10:33
【问题描述】:

背景

我正在设计一个文件读取层,它可以读取分隔文件并将其加载到List 中。我决定使用 Spring Batch 是因为它提供了许多可扩展性选项,我可以根据文件的大小将它们用于不同的文件集。

要求

  1. 我想设计一个通用的 Job API,可以用来读取任何分隔文件。
  2. 应该有一个单独的作业结构用于解析每个分隔文件。例如,如果系统需要读取 5 个文件,就会有 5 个作业(每个文件一个)。 5 个作业彼此不同的唯一方式是它们将使用不同的FieldSetMapper、列名、目录路径和其他缩放参数,例如commit-intervalthrottle-limit
  3. 此 API 的用户不需要配置 Spring 当系统中引入新的文件类型时,他自己的批处理作业、步骤、分块、分区等。
  4. 用户只需提供作业使用的FieldsetMappercommit-intervalthrottle-limit 以及每种类型文件的存放目录。
  5. 每个文件将有一个预定义目录。每个目录可以包含多个相同类型和格式的文件。 MultiResourcePartioner 将用于查看目录内部。分区数 = 目录中的文件数。

我的要求是构建一个 Spring Batch 基础架构,它可以为我提供一份独特的工作,一旦我掌握了构成该工作的点点滴滴,我就可以启动它。

我的解决方案:

我创建了一个抽象配置类,它将被具体配置类扩展(每个文件将有 1 个具体类要读取)。

    @Configuration
    @EnableBatchProcessing
    public abstract class AbstractFileLoader<T> {

    private static final String FILE_PATTERN = "*.dat";

    @Autowired
    JobBuilderFactory jobs;

    @Autowired
    ResourcePatternResolver resourcePatternResolver;

    public final Job createJob(Step s1, JobExecutionListener listener) {
        return jobs.get(this.getClass().getSimpleName())
                .incrementer(new RunIdIncrementer()).listener(listener)
                .start(s1).build();
    }

    public abstract Job loaderJob(Step s1, JobExecutionListener listener);

    public abstract FieldSetMapper<T> getFieldSetMapper();

    public abstract String getFilesPath();

    public abstract String[] getColumnNames();

    public abstract int getChunkSize();

    public abstract int getThrottleLimit();

    @Bean
    @StepScope
    @Value("#{stepExecutionContext['fileName']}")
    public FlatFileItemReader<T> reader(String file) {
        FlatFileItemReader<T> reader = new FlatFileItemReader<T>();
        String path = file.substring(file.indexOf(":") + 1, file.length());
        FileSystemResource resource = new FileSystemResource(path);
        reader.setResource(resource);
        DefaultLineMapper<T> lineMapper = new DefaultLineMapper<T>();
        lineMapper.setFieldSetMapper(getFieldSetMapper());
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(",");
        tokenizer.setNames(getColumnNames());
        lineMapper.setLineTokenizer(tokenizer);
        reader.setLineMapper(lineMapper);
        reader.setLinesToSkip(1);
        return reader;
    }

    @Bean
    public ItemProcessor<T, T> processor() {
        // TODO add transformations here
        return null;
    }

    @Bean
    @JobScope
    public ListItemWriter<T> writer() {
        ListItemWriter<T> writer = new ListItemWriter<T>();
        return writer;
    }

    @Bean
    @JobScope
    public Step readStep(StepBuilderFactory stepBuilderFactory,
            ItemReader<T> reader, ItemWriter<T> writer,
            ItemProcessor<T, T> processor, TaskExecutor taskExecutor) {

        final Step readerStep = stepBuilderFactory
                .get(this.getClass().getSimpleName() + " ReadStep:slave")
                .<T, T> chunk(getChunkSize()).reader(reader)
                .processor(processor).writer(writer).taskExecutor(taskExecutor)
                .throttleLimit(getThrottleLimit()).build();

        final Step partitionedStep = stepBuilderFactory
                .get(this.getClass().getSimpleName() + " ReadStep:master")
                .partitioner(readerStep)
                .partitioner(
                        this.getClass().getSimpleName() + " ReadStep:slave",
                        partitioner()).taskExecutor(taskExecutor).build();

        return partitionedStep;

    }

    /*
     * @Bean public TaskExecutor taskExecutor() { return new
     * SimpleAsyncTaskExecutor(); }
     */

    @Bean
    @JobScope
    public Partitioner partitioner() {
        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
        Resource[] resources;
        try {
            resources = resourcePatternResolver.getResources("file:"
                    + getFilesPath() + FILE_PATTERN);
        } catch (IOException e) {
            throw new RuntimeException(
                    "I/O problems when resolving the input file pattern.", e);
        }
        partitioner.setResources(resources);
        return partitioner;
    }

    @Bean
    @JobScope
    public JobExecutionListener listener(ListItemWriter<T> writer) {
        return new JobCompletionNotificationListener<T>(writer);
    }

    /*
     * Use this if you want the writer to have job scope (JIRA BATCH-2269). Also
     * change the return type of writer to ListItemWriter for this to work.
     */
    @Bean
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor() {
            @Override
            protected void doExecute(final Runnable task) {
                // gets the jobExecution of the configuration thread
                final JobExecution jobExecution = JobSynchronizationManager
                        .getContext().getJobExecution();
                super.doExecute(new Runnable() {
                    public void run() {
                        JobSynchronizationManager.register(jobExecution);

                        try {
                            task.run();
                        } finally {
                            JobSynchronizationManager.close();
                        }
                    }
                });
            }
        };
    }

}

假设为了讨论,我必须阅读发票数据。因此,我可以扩展上述类来创建InvoiceLoader

@Configuration
public class InvoiceLoader extends AbstractFileLoader<Invoice>{

    private class InvoiceFieldSetMapper implements FieldSetMapper<Invoice> {

        public Invoice mapFieldSet(FieldSet f) {
            Invoice invoice = new Invoice();
            invoice.setNo(f.readString("INVOICE_NO");
            return e;
        }
    }

    @Override
    public FieldSetMapper<Invoice> getFieldSetMapper() {
        return new InvoiceFieldSetMapper();
    }

    @Override
    public String getFilesPath() {
        return "I:/CK/invoices/partitions/";
    }

    @Override
    public String[] getColumnNames() {
        return new String[] { "INVOICE_NO", "DATE"};
    }


    @Override
    @Bean(name="invoiceJob")
    public Job loaderJob(Step s1,
            JobExecutionListener listener) {
        return createJob(s1, listener);
    }

    @Override
    public int getChunkSize() {
        return 25254;
    }

    @Override
    public int getThrottleLimit() {
        return 8;
    }

}

假设我还有一个名为 Inventory 的类扩展了 AbstractFileLoader.

在应用启动时,我可以如下加载这两个注解配置:

AbstractApplicationContext context1 = new   AnnotationConfigApplicationContext(InvoiceLoader.class, InventoryLoader.class);

在我的应用程序的其他地方,两个不同的线程可以按如下方式启动作业:

线程 1:

    JobLauncher jobLauncher1 = context1.getBean(JobLauncher.class);
    Job job1 = context1.getBean("invoiceJob", Job.class);
    JobExecution jobExecution = jobLauncher1.run(job1, jobParams1);

线程 2:

    JobLauncher jobLauncher1 = context1.getBean(JobLauncher.class);
    Job job1 = context1.getBean("inventoryJob", Job.class);
    JobExecution jobExecution = jobLauncher1.run(job1, jobParams1);

这种方法的优点是每次有新文件要读取时,开发人员/用户所要做的就是继承AbstractFileLoader 并实现所需的抽象方法,而无需详细了解如何组装工作。

问题:

  1. 我是 Spring 批处理的新手,所以我可能忽略了这种方法的一些不太明显的问题,例如 Spring 批处理中的共享内部对象可能导致两个作业一起运行失败或明显的问题,例如豆子。
  2. 有没有更好的方法来实现我的目标?
  3. @Value("#{stepExecutionContext['fileName']}")fileName 属性始终被赋值为I:/CK/invoices/partitions/,这是InvoiceLoadergetPath 方法返回的值,即使getPathmethod inInventoryLoader`返回不同的值.

【问题讨论】:

  • 这里的问题在哪里?
  • 对不起,如果我不清楚。问题在最后两段。我想在旅途中组装一个新的Job,而不是我启动一个预配置的工作开发人员将提供FieldSetMapperItemWriter 的子类以及其他参数,例如提交间隔FileLoader。然后FileLoader 应该将这些组件组装成Job 并启动Job。我想代表开发人员组装作业,而不是要求他们配置它们。我的帖子中的第 1 点和第 2 点指定了要求。编辑我的帖子以使其清晰!
  • 那么您希望“用户”为FieldSetMapperItemWriter 等提供bean?
  • @Artefacto 没错。我希望用户只提供FieldSetMapperItemWriter 以及我的帖子中提到的其他参数。我将使用这些组件并组装一个Job 并启动它。所以这里没有预先配置的作业。我必须从提供的组件中组装一个,然后启动它。此外,此方法可能会在应用程序的生命周期内被多次调用,因此每个作业都必须是唯一的,但我应该能够重用诸如JobLauncher 之类的组件。坦率地说,我不知道如何实现这一目标。
  • @Artefacto 组装作业而不是要求“用户”配置它们的原因是,“用户”主要是支持人员,他们知道一点点 Java 代码能够编写简单的 Java 类,如 FieldSetMapper 实现。要求他们配置 Spring batch 工作是不可能的。

标签: spring file annotations spring-batch


【解决方案1】:

一种选择是将它们作为作业参数传递。例如:

@Bean
Job job() {
    jobs.get("myJob").start(step1(null)).build()
}

@Bean
@JobScope
Step step1(@Value('#{jobParameters["commitInterval"]}') commitInterval) {
    steps.get('step1')
            .chunk((int) commitInterval)
            .reader(new IterableItemReader(iterable: [1, 2, 3, 4], name: 'foo'))
            .writer(writer(null))
            .build()
}

@Bean
@JobScope
ItemWriter writer(@Value('#{jobParameters["writerClass"]}') writerClass) {
    applicationContext.classLoader.loadClass(writerClass).newInstance()
}

MyWriter:

class MyWriter implements ItemWriter<Integer> {

    @Override
    void write(List<? extends Integer> items) throws Exception {
        println "Write $items"
    }
}

然后执行:

def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([
        commitInterval: new JobParameter(3),
        writerClass: new JobParameter('MyWriter'), ]))

输出是:

信息:执行步骤:[step1] 写 [1, 2, 3] 写 [4] 2016 年 2 月 24 日下午 2:30:22 org.springframework.batch.core.launch.support.SimpleJobLauncher$1 运行 信息:作业:[SimpleJob:[name=myJob]] 使用以下参数完成:[{commitInterval=3, writerClass=MyWriter}] 和以下状态:[COMPLETED] 状态为:COMPLETED,作业执行 id 0 #1 步骤 1 完成

完整示例here

【讨论】:

  • 这是有道理的。但是仍然存在重用此代码来创建多个作业的问题。如果我想启动 5 个具有相同配置的作业,我将如何做到这一点而无需触及您的配置类。
  • @CKing 你的意思是 5 个工作实例还是五个不同的工作?在这种情况下,只有一项工作。如果你想要几个类似的工作,你可以通过@Import分享配置。
  • 有道理。我将不得不通过 AnnotationConfigApplicationContext 将这些作业配置类中的每一个加载到它们自己的 Spring 容器中
  • 我已经用另一种方法编辑了这个问题。你的回答给了我一些关于如何推进这一点的想法。我的问题现在可能会让我的目标更加明确。
  • 在尝试了很多选项之后,您的选项似乎是唯一一个在注释方面有效的选项。谢谢:)
猜你喜欢
  • 2015-04-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-01-02
  • 2018-02-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多