【问题标题】:How to run a step in a loop in Spring Batch : Updated如何在 Spring Batch 中循环运行一个步骤:更新
【发布时间】:2016-12-05 13:45:27
【问题描述】:

我有以下春季批处理作业:

<job id="MyBatchJob" job-repository="jobRepository">

  <step id="ConfigurationReadStep">
        <tasklet ref="ConfigurationReadTasklet"  transaction-manager="jobRepository-transactionManager"/>
        <next on="COMPLETED"  to="NextStep" />  
  </step>        

  <step id="NextStep">
    <tasklet transaction-manager="jobRepository-transactionManager">
        <chunk reader="myItemReader" writer="myItemWriter" commit-interval="1000"/>
    </tasklet>
  </step>

     <listeners>
        <listener ref="jobListener" />
     </listeners>     

</job>

它的第一步是配置读取步骤,在经过一些业务逻辑之后,我遇到了一个查询列表。例如说 10 个查询。我知道我可以使用JobExecutionContextPromotionListener 来推广这个列表。

但是,我想将此查询 1 一个 1 地提供给下一步的阅读器作为阅读器查询,并在循环中运行该步骤,直到所有阅读器查询都被消耗完。我想将每个查询作为春季批处理场景运行,因为它们可能会返回大量项目。

我该怎么做?

**************************************** 更新 *********** ***********************

这就是我想要做的:

<job id="MyJob" job-repository="jobRepository">

      <step id="ConfigurationReadStep">
            <tasklet ref="ConfigurationReadTasklet"  transaction-manager="jobRepository-transactionManager"/>
            <next on="COMPLETED"  to="MyNextStep" />    
             <listeners>
                <listener ref="promotionListener"/>
            </listeners>
      </step>        

    <step id="MyNextStep" next="limitDecision">
        <tasklet transaction-manager="jobRepository-transactionManager">
            <chunk reader="MyItemReader" writer="MyItemWriter" commit-interval="1000"/>
        </tasklet>
    </step>

    <decision id="limitDecision" decider="limitDecider">
        <next on="CONTINUE" to="MyNextStep" />
        <end on="COMPLETED" />
    </decision>

     <listeners>
        <listener ref="jobListener" />
     </listeners>

  </job>

<beans:bean id="jobListener" class="com.hsbc.gbm.dml.integration.batch.listener.SimpleJobListener" />

<beans:bean id="promotionListener" class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">
    <beans:property name="keys" value="readerLimit,readerQueries,readerSQL" />
</beans:bean>

  <beans:bean id="ConfigurationReadTasklet" class="com.mypackage.ConfigurationReadTasklet" scope="step">
  </beans:bean>

<beans:bean id="MyItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
    <beans:property name="dataSource" ref="myDataSource" />
    <beans:property name="sql" value="#{jobExecutionContext['readerSQL']}" />
    <beans:property name="rowMapper">
        <beans:bean  name="mapper" class="com.mypackage.MyRowMapper" />
    </beans:property>
</beans:bean>

<beans:bean id="MyItemWriter" class="com.mypackage.MyItemWriter" scope="step">
    <beans:constructor-arg ref="myDataSource" />
</beans:bean>

<beans:bean id="limitDecider" class="com.mypackage.StepLoopLimitDecider" scope="step"> 
    <beans:property name="readerQueries" value="#{jobExecutionContext['readerQueries']}" />
</beans:bean>     

在 tasklet 中,我将获得读者查询列表。我会将readerQuery 设置为其中的第一个,然后进入下一步,该步骤将作为春季批处理步骤正常运行。

一旦完成,我希望我的Decider 检查是否有更多读者查询,如果是,它将readerQuery 更改为下一个查询并重新运行NextStep,否则工作将完成。以下是我的决定者:

public class StepLoopLimitDecider implements JobExecutionDecider {

private List<String> readerQueries;

private int count = 1;

public FlowExecutionStatus decide(JobExecution jobExecution,StepExecution stepExecution) {
    if (count >= this.readerQueries.size()) {
        return new FlowExecutionStatus("COMPLETED");
    }
    else {
        System.out.println(count + ": "+readerQueries.get(count));
        jobExecution.getExecutionContext().put("readerSQL", readerQueries.get(count));
        count = count + 1;
        return new FlowExecutionStatus("CONTINUE");
    }
}

public void setReaderQueries(List<String> readerQueries) {
    this.readerQueries = readerQueries;
}   
}

但是,这不起作用。步骤第一次正确运行。但是决策者失败并出现以下错误:

    2016-12-06 14:46:24 ERROR AbstractJob:306 - Encountered fatal error executing job
org.springframework.batch.core.JobExecutionException: Flow execution ended unexpectedly
    at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:141)
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:281)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:120)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:114)
    at org.springframework.batch.core.launch.support.CommandLineJobRunner.start(CommandLineJobRunner.java:348)
    at org.springframework.batch.core.launch.support.CommandLineJobRunner.main(CommandLineJobRunner.java:565)
Caused by: 
org.springframework.batch.core.job.flow.FlowExecutionException: Ended flow=MyBatchJob at state=MyBatchJob.limitDecision with exception
    at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:152)
    at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:124)
    at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:135)
    ... 6 more
Caused by: 
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.limitDecider': Scope 'step' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No context holder available for step scope
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:339)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:190)
    at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:33)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:182)
    at $Proxy4.decide(Unknown Source)
    at org.springframework.batch.core.job.flow.support.state.DecisionState.handle(DecisionState.java:43)
    at org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean$DelegateState.handle(SimpleFlowFactoryBean.java:141)
    at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:144)
    ... 8 more
Caused by: 
java.lang.IllegalStateException: No context holder available for step scope
    at org.springframework.batch.core.scope.StepScope.getContext(StepScope.java:197)
    at org.springframework.batch.core.scope.StepScope.get(StepScope.java:139)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:325)
    ... 15 more

有人可以帮我完成这项工作吗?

【问题讨论】:

  • 决策不是 Spring Batch 中的一个步骤,因此它不能是步骤范围。使该工作具有范围,并且事情应该从我所看到的范围内进行...

标签: java spring spring-batch step


【解决方案1】:

我建议您根据计算查询的业务逻辑的结果动态构建您的工作。

但是,你不能使用 xml-configuration 来做到这一点,因此,我建议你使用 java-config api。

我已经写了几个关于“如何动态创建工作”的类似问题的答案。如果您需要进一步的建议,请查看它们并告诉我。

SpringBatch - javaconfig vs xml

Spring Batch Java Config: Skip step when exception and go to next steps

Spring batch repeat step ending up in never ending loop

Spring Batch - How to generate parallel steps based on params created in a previous step

Spring Batch - Looping a reader/processor/writer step

【讨论】:

  • 谢谢。你能检查我的问题的更新吗?我正在尝试一种方法,但它不起作用。
  • 这行不通。 SpringBatch 不打算在单个作业启动期间多次执行特定步骤。该框架保留了步骤及其读取器和写入器的多个内部状态。您需要在运行时侵入实例化的作业结构,并以某种方式重置步骤和读取器和写入器的状态。但这太丑陋了,太肮脏了。绝对不是你想做的事。
  • 正如我试图解释的那样,请熟悉 Java-API。然后使用 Java 代码而不是 XML 创建您的作业。据我了解,启动应用程序后,您可以确定必须创建的不同步骤。您可以使用 Java-Api 执行此操作。看看我的回答:stackoverflow.com/questions/37238813/…
猜你喜欢
  • 2023-03-13
  • 1970-01-01
  • 1970-01-01
  • 2022-01-07
  • 2019-12-11
  • 2016-04-24
  • 1970-01-01
  • 2018-02-26
  • 2021-06-09
相关资源
最近更新 更多