【问题标题】:How to combine multiple listeners (step, read, process, write and skip) in spring batch如何在 Spring Batch 中组合多个侦听器(步骤、读取、处理、写入和跳过)
【发布时间】:2019-07-10 16:00:12
【问题描述】:

此操作的目的是通过多个步骤跟踪 Spring 批处理作业中正在读取/处理/写入的行或项目。

我创建了一个实现这些接口的监听器:StepExecutionListener, SkipPolicy, ItemReadListener, ItemProcessListener, ItemWriteListener

@Component
public class GenericListener implements StepExecutionListener, SkipPolicy, ItemReadListener, ItemProcessListener, ItemWriteListener {
    private Log logger = LogFactory.getLog(getClass());
    private JobExecution jobExecution;
    private int numeroProcess = 0;
    private int currentReadIndex = 0;
    private int currentProcessIndex = 0;
    private int currentWriteIndex = 0;

    @Override
    public void beforeRead() throws Exception {
        log.info(String.format("[read][line : %s]", currentReadIndex));
        currentReadIndex++;
    }
    @Override
    public void afterRead (Object o) throws Exception {
        log.info("Ligne correct");
    }
    @Override
    public void onReadError (Exception e) throws Exception {
        jobExecution.stop();
    }
    @Override
    public boolean shouldSkip (Throwable throwable, int i) throws SkipLimitExceededException {
        String err = String.format("Erreur a la ligne %s | message %s | cause %s | stacktrace %s", numeroProcess, throwable.getMessage(), throwable.getCause().getMessage(), throwable.getCause().getStackTrace());
        log.error(err);
        return true;
    }
    @Override
    public void beforeProcess (Object o) {
        log.debug(String .format("[process:%s][%s][Object:%s]", numeroProcess++, o.getClass(), o.toString()));
        currentProcessIndex++;
    }
    @Override
    public void afterProcess (Object o, Object o2) { }
    @Override
    public void onProcessError (Object o, Exception e) {
        String err = String.format("[ProcessError at %s][Object %s][Exception %s][Trace %s]", currentProcessIndex, o.toString(), e.getMessage(), e.getStackTrace());
        log.error(err);
        jobExecution.stop();
    }
    @Override
    public void beforeWrite (List list) {
        log.info(String .format("[write][chunk number:%s][current chunk size %s]", currentWriteIndex, list != null ? list.size() : 0));
        currentWriteIndex++;
    }
    @Override
    public void afterWrite (List list) { }
    @Override
    public void onWriteError (Exception e, List list) {
        jobExecution.stop();
    }
    @Override
    public void beforeStep(StepExecution stepExecution) {
        jobExecution = stepExecution.getJobExecution();
        currentReadIndex = 0;
        currentProcessIndex = 0;
        currentWriteIndex = 0;
    }
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return null;
    }
}

作业定义(CustomJobListener 是一个扩展 JobExecutionListenerSupport 的简单类)

public class BatchConfiguration {
    @Autowired
    public JobBuilderFactory jobs;

    @Bean
    public Job job(CustomJobListener listener,
                     @Qualifier("step1") Step step1,
                     @Qualifier("step2") Step step2,
                     @Qualifier("step3") Step step3) {
        return jobs.get("SimpleJobName")
                .incrementer(new RunIdIncrementer())
                .preventRestart()
                .listener(listener)
                .start(step1)
                .next(step2)
                .next(step3)
                .build();
    }
}

步骤定义(所有三个步骤的定义相同,只是读取/处理器/写入器发生了变化)

@Component
public class StepControleFormat {
    @Autowired
    private StepOneReader reader;
    @Autowired
    private StepOneProcessor processor;
    @Autowired
    private StepOneWriter writer;
    @Autowired
    private ConfigAccess configAccess;
    @Autowired
    private GenericListener listener;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    @JobScope
    @Qualifier("step1")
    public Step stepOne() throws StepException {
        return stepBuilderFactory.get("step1")
                .<StepOneInput, StepOneOutput>chunk(configAccess.getChunkSize())
                .listener((ItemProcessListener<? super StepOneInput, ? super StepOneOutput>) listener)
                .faultTolerant()
                .skipPolicy(listener)
                .reader(reader.read())
                .processor(processor.compose())
                .writer(writer)
                .build();
    }
}

现在的问题是beforeStep(StepExecution stepExecution)afterStep(StepExecution stepExecution) 方法没有被触发,但是GenericListener 中的所有其他方法在它们各自的事件发生时都会被正确触发。

我尝试使用listener((StepExecutionListener)listener) 而不是listener((ItemProcessListener&lt;? super StepOneInput, ? super StepOneOutput&gt;) listener),但后者返回AbstractTaskletStepBuiler,然后我无法使用readerprocessorwriter

更新:我的spring boot版本是:v1.5.9.RELEASE

【问题讨论】:

  • 在设置阅读器/处理器/编写器后,使用StepExecutionListener 演员调用监听器。
  • @MichaelMinella 我做了,但它在listener((StepExecutionListener)listener) 之后生成编译器错误Cannot resolve method 'build()'
  • 您使用的是什么版本的 Spring Batch?我确认这编译得很好:gist.github.com/mminella/3437f89aa6ca85c6f94bd7772ed498a2
  • @MichaelMinella 我更新了我的问题并回答了它(下面的答案),但是我的答案中有些东西仍然令人困惑,所以如果你能帮忙的话。谢谢。

标签: java spring spring-batch


【解决方案1】:

感谢 Michael Minella 的提示,我解决了这个问题:

@Bean
@JobScope
@Qualifier("step1")
public Step stepOne() throws StepException {
    SimpleStepBuilder<StepOneInput, StepOneOutput> builder = stepBuilderFactory.get("step1")
            .<StepOneInput, StepOneOutput>chunk(configAccess.getChunkSize())
            // setting up listener for Read/Process/Write
            .listener((ItemProcessListener<? super StepOneInput, ? super StepOneOutput>) listener)
            .faultTolerant()
            // setting up listener for skipPolicy
            .skipPolicy(listener)
            .reader(reader.read())
            .processor(processor.compose())
            .writer(writer);

    // for step execution listener
    builder.listener((StepExecutionListener)listener);

    return builder.build();
}

最后调用的listener 方法public B listener(StepExecutionListener listener)StepBuilderHelper&lt;B extends StepBuilderHelper&lt;B&gt;&gt; 返回一个StepBuilderHelper,它不包含build() 方法的定义。所以解决方案是拆分步骤构建定义。

我不明白的是:虽然writer 方法返回一个SimpleStepBuilder&lt;I, O&gt;,其中包含此方法public SimpleStepBuilder listener(Object listener) 的定义,但编译器/IDE(IntelliJ IDEA)正在从@987654331 调用public B listener(StepExecutionListener listener) @。如果有人可以帮助解释这种行为。

此外,找到一种方法来使用来自SimpleStepBuilderpublic SimpleStepBuilder listener(Object listener) 在一个呼叫中连接所有听众将非常有趣。

【讨论】:

  • 感谢您发布此回复。这真的很有用,对我有用。
【解决方案2】:

可以按如下方式添加额外的 Step Listeners。

@Bean(name = STEP1)
public Step rpcbcStep() {
    
    SimpleStepBuilder<Employee, Employee> builder = stepBuilderFactory.get(STEP1).<Employee, Employee>chunk(100)
            .reader(step1BackgroundReader())
            .processor(processor())
            .writer(writer());
            
    builder.listener(step1BackgroundStepListener)
    builder.listener(step1BackgroundStepListener2);
    // add any other listeners needed
    
    return builder.build();
}

【讨论】:

    猜你喜欢
    • 2018-03-23
    • 2020-09-10
    • 2017-01-08
    • 2013-09-30
    • 2018-02-06
    • 1970-01-01
    • 1970-01-01
    • 2016-05-14
    • 1970-01-01
    相关资源
    最近更新 更多