【问题标题】:Partitioner for JdbcCursorItemReader - Reader must be open before it can be readJdbcCursorItemReader 的分区器 - 读取器必须先打开才能读取
【发布时间】:2016-12-05 17:18:15
【问题描述】:

我有一个工作的 spring 批处理作业,当我尝试使用分区器创建多线程时,我开始获取 Reader 必须打开才能读取它。

org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read.
    at org.springframework.batch.item.database.AbstractCursorItemReader.doRead(AbstractCursorItemReader.java:443) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:88) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_101]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_101]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_101]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_101]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE]
    at com.sun.proxy.$Proxy58.read(Unknown Source) ~[na:na]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:91) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:157) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:116) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:110) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133) ~[spring-tx-4.2.6.RELEASE.jar:4.2.6.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:271) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:257) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:139) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:136) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_101]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]

2016-08-01 14:26:14.700 ERROR 12716 --- [           main] o.s.batch.core.step.AbstractStep         : Encountered an error executing step partitionStep in job exportMasterListCsv

org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
    at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:392) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-4.2.6.RELEASE.jar:4.2.6.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.test.JobLauncherTestUtils.launchJob(JobLauncherTestUtils.java:152) [spring-batch-test-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at com.myer.pricing.onlinestore.export.job.ExportMasterListCsvTest.testLaunchJob(ExportMasterListCsvTest.java:60) [test-classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_101]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_101]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_101]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_101]
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) [junit-4.11.jar:na]
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) [junit-4.11.jar:na]
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) [junit-4.11.jar:na]
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) [junit-4.11.jar:na]
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) [junit-4.11.jar:na]
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) [junit-4.11.jar:na]
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:254) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:89) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) [junit-4.11.jar:na]
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) [junit-4.11.jar:na]
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) [junit-4.11.jar:na]
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) [junit-4.11.jar:na]
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) [junit-4.11.jar:na]
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309) [junit-4.11.jar:na]
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:193) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE]
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) [.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) [.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) [.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678) [.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) [.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) [.cp/:na]

我只是想知道 JdbcCursorItemReader 是否不是线程安全的,因此也许我会收到此错误?

我大致是这样安排我的工作的……

@Configuration
public class ExportMasterListCsvJobConfig {

public static final String JOB_NAME = "exportMasterListCsv";

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Bean
public TaskExecutor taskExecutor() {
  return new SimpleAsyncTaskExecutor();
}


/**
 * Create job to export the master list from the online pricing staging db.
 * 
 * @param readStgDbAndExportMasterListStep read online staging db step and export to csv
 * @return job to export the master list from the online pricing staging db.
 */
@Bean
public Job exportMasterListCsvJob(@Qualifier("partitionStep") Step partitionStep) {

    return jobBuilderFactory.get(JOB_NAME)
            .incrementer(new RunIdIncrementer())
            .start(partitionStep)
            .build();
}

  @Bean
  public Step partitionStep(@Qualifier("readStgDbAndExportMasterListStep") Step readStgDbAndExportMasterListStep){
    return stepBuilderFactory.get("partitionStep")
        .partitioner(readStgDbAndExportMasterListStep)
        .partitioner("readStgDbAndExportMasterListStep", partitioner())
        .taskExecutor(taskExecutor())
        .build();
  }

/**
 * Read from online pricing staging db.
 * 
 * @param chunkSize
 * @param queryOnlineStagingDbReaderForMasterList
 * @param masterListOutputProcessor
 * @param masterListFileWriter
 * @return step to read from online pricing staging db.
 */
@Bean
public Step readStgDbAndExportMasterListStep(
        @Value("${exportMasterListCsv.generateMasterListRows.chunkSize}") int chunkSize,
        @Qualifier("queryOnlineStagingDbReaderForMasterList") ItemReader<MasterList> queryOnlineStagingDbReaderForMasterList,
        @Qualifier("masterListOutputProcessor") ItemProcessor<MasterList,MasterList> masterListOutputProcessor,
        @Qualifier("masterListFileWriter") ItemWriter<MasterList> masterListFileWriter) {

    return  stepBuilderFactory.get("readStgDbAndExportMasterListStep")
                .<MasterList,MasterList>chunk(chunkSize)
                .reader(queryOnlineStagingDbReaderForMasterList)
                .processor(masterListOutputProcessor)
                .writer(masterListFileWriter)
                .build();

}   

/**
 * Query and map rows from online pricing staging db into a master list object.
 * 
 * @param onlineStagingDb
 * @param masterListSql
 * @return
 */
@Bean
@StepScope
public ItemReader<MasterList> queryOnlineStagingDbReaderForMasterList(
        DataSource onlineStagingDb,
        @Value("#{stepExecutionContext[divisionId]}") Integer divisionId,
        @Value("${exportMasterListCsv.generateMasterListRows.masterListSql}") String masterListSql) {

    JdbcCursorItemReader<MasterList> reader = new JdbcCursorItemReader<>();

    reader.setDataSource(onlineStagingDb);
    reader.setSql(masterListSql);   
    reader.setPreparedStatementSetter(new PreparedStatementSetter() {
        public void setValues(PreparedStatement ps) throws SQLException {
            ps.setInt(1, divisionId);
        }
    });


    reader.setRowMapper(new RowMapper<MasterList>() { 
        @Override 
        public MasterList mapRow(ResultSet resultSet, int i) throws SQLException {
            MasterList masterList = new MasterList();
            masterList.setL1(resultSet.getString(DaoConstants.COLUMN_HEADER_LEVEL_ONE));

            return masterList;
        } 
    });         
    return reader;
}   

@Bean
public Partitioner partitioner(){
    return new DivisionPartitioner();
}

我一直在网上阅读。这可能与https://jira.spring.io/browse/BATCH-1301 有关吗? 谢谢

【问题讨论】:

  • 只有部分代码被正确粘贴; queryOnlineStagingDbReaderForMasterList 被切断。
  • 添加了一些我的实现给你看看

标签: spring spring-batch


【解决方案1】:

我只是想知道 JdbcCursorItemReader 是否不是线程安全的 也许为什么我会收到这个错误?

JdbcCursorItemReader 是 AbstractCursorItemReader,而 AbstractCursorItemReader 又是 AbstractItemCountingItemStreamItemReader。

根据 AbstractItemCountingItemStreamItemReader 的文档实现,不是线程安全的

http://docs.spring.io/spring-batch/apidocs/org/springframework/batch/item/support/AbstractItemCountingItemStreamItemReader.html

ItemReaders 的抽象超类,支持通过存储重新启动 ExecutionContext 中的项目计数(因此需要项目排序 在运行之间保留)。 子类本质上不是 线程安全。

这部分日志暗示了线程问题:

at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]

尝试通过将 taskexecutor 设置为仅使用一个线程来顺序运行它。

public TaskExecutor taskExecutor() {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
    taskExecutor.setConcurrencyLimit(1);
    return taskExecutor;
}

【讨论】:

  • 嗨,将 concurrencyLimit 设置为 1 仍然给我同样的错误。但是感谢您指出堆栈跟踪表明这与线程有关。如果有人有任何想法,仍在寻找解决此问题的方法。谢谢
  • 我一直在为这段代码使用 bean 范围,我认为它让我有所收获。我现在有一个不同的问题,关于如何确定我的 bean 的范围。我想我会结束这个问题并开始一个针对范围问题的新问题。感谢您的帮助
  • 很高兴它对您有所帮助。关于 bean 范围,它与属性的绑定有关(例如,在什么时间),请参阅docs.spring.io/spring-batch/reference/html/…
【解决方案2】:

第一次阅读时,读者需要很长时间才能准备好。有时 rowMapper 可能会走得太远,它认为阅读器没有打开,而它只是仍在打开的过程中。要解决这个问题,只需在第一次使用 rowMapper 时让它进入休眠状态。将以下块添加到您的 setRowMapper 中,看看错误是否消失。

try {
    if(firstTime) {
        Thread.sleep(20000);
        firstTime = false;
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

【讨论】:

    【解决方案3】:

    这是由于阅读器的返回类型所致

    public ItemReader<MasterList> queryOnlineStagingDbReaderForMasterList 
    

    它应该改为JdbcCursorItemReader,这是使用该方法构建的阅读器。否则 bean reader 的方法的可见性会被限制在更高的级别,并且无法正确打开 reader。

    【讨论】:

      猜你喜欢
      • 2014-07-13
      • 2019-10-24
      • 1970-01-01
      • 1970-01-01
      • 2014-09-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多