【问题标题】:Scope 'job' is not active for the current thread, No context holder available for job scope Spring-Batch范围“作业”对于当前线程无效,没有可用于作业范围 Spring-Batch 的上下文持有者
【发布时间】:2021-08-06 00:21:20
【问题描述】:

在我的 Spring 批处理作业中,我尝试使用 JobExecutionContext 在步骤之间共享数据,这仅在我将步骤保持为单线程时才有效,如下所示:

    @EnableTask
    @EnableBatchProcessing
    @Configuration
    @PropertySource(value = {"classpath:application.properties"})

    public class Config{

    private static final HashMap<String,Object> OVERRIDDEN_BY_EXPRESSION = null;
    private static final String QUERY = "SELECT * FROM \"Config\"";

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    private MongoTemplate mongoTemplate;

    @Autowired
    EntityManager em;


    @Autowired
    DataSource dataSource;


    /*Config Step*/

   @Bean
    public JdbcCursorItemReader<BatchConfig> configReader(DataSource dataSource) {
        JdbcCursorItemReader<BatchConfig> config = new JdbcCursorItemReader<>();
            config.setDataSource(dataSource);

            config.setSql(QUERY);
            config.setRowMapper(new BatchRowMapper());

            return config;
    }

    @Bean
    public ItemWriter<BatchConfig> itemWriter() {
        return new ItemWriter<BatchConfig>() {

            private StepExecution stepExecution;

            @Override
            public void write(List<? extends BatchConfig> items) {
                ExecutionContext stepContext = this.stepExecution.getExecutionContext();
                for (BatchConfig item : items) {
                    HashMap<String, Object> table = new HashMap<>();
                    table.put("date", item.getDate_time());
                    table.put("size", item.getSize());
                    System.out.println(table);
                    stepContext.put(item.getName(), table);

                }
            }

            @BeforeStep
            public void saveStepExecution(StepExecution stepExecution) {
                this.stepExecution = stepExecution;
            }
        };
    }

    @Bean
    public Step stepConfig(JdbcCursorItemReader<BatchConfig> configReader) throws Exception {
        return stepBuilderFactory.get("stepConfig")
                .<BatchConfig, BatchConfig>chunk(10)
                .reader(configReader)
                .writer(itemWriter())
                .listener(promotionListener())
                .build();
    }

    @Bean
    public ExecutionContextPromotionListener promotionListener() {
        ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
        listener.setKeys(new String[] {"COUNTRY", "CATEGORY", "USER"});
        return listener;
    }



    /*Country Step*/

    @JobScope
    @Bean
    public MongoItemReader<COUNTRY> CountryItemReader(@Value("#{jobExecutionContext['COUNTRY']}") HashMap<String, Object> table) {
       int date = (int) table.get("date");
        MongoItemReader<COUNTRY> reader = new MongoItemReader<COUNTRY>();
        reader.setTemplate(mongoTemplate);
        reader.setTargetType(COUNTRY.class);
        reader.setCollection("COUNTRY");
        reader.setFields("{\"COUNTRY_NAME\": 1,\"SHORT_NAME\": 1,\"DEPT_CODE\": 1}");
        reader.setSort(new HashMap<String, Sort.Direction>() {{
            put("_id", Sort.Direction.DESC);
        }});
        reader.setQuery("{DATE_TIME: {$gt:"+date+"}}");
        reader.setPageSize(250);
        return reader;
    }

    @Bean
    public CountryItemProcessor CountryProcessor(){
        return new CountryItemProcessor();
    }

    @Bean
    public JpaItemWriter<COUNTRY> country_writer(){
        JpaItemWriter<COUNTRY> jpa = new JpaItemWriter<COUNTRY>();
        jpa.setEntityManagerFactory(em.getEntityManagerFactory());
        return jpa;
    }


    @JobScope
    @Bean
    public Step step1(@Value("#{jobExecutionContext['COUNTRY']}") HashMap<String, Object> tab) {
        int size = (int) tab.get("size");
        //System.out.println(size);
        return stepBuilderFactory.get("step1")
                .<COUNTRY, COUNTRY>chunk(20)
                .reader(CountryItemReader(OVERRIDDEN_BY_EXPRESSION))
                .writer(country_writer())
                .build();
    }



    @Bean
    public Job TestJob(Step stepConfig) throws Exception {

        return this.jobBuilderFactory.get("TestJob")
                .incrementer(new RunIdIncrementer())// because a spring config bug, this incrementer is not really useful
                .start(stepConfig)
                .next(step1(OVERRIDDEN_BY_EXPRESSION))
               
                .build();
    }

}

但是在添加SimpleAsyncTaskExecutor 时出现错误:

    org.springframework.beans.factory.support.ScopeNotActiveException: Error creating bean with name 'scopedTarget.CountryItemReader': Scope 'job' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No context holder available for job scope
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:383) ~[spring-beans-5.3.6.jar:5.3.6]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.6.jar:5.3.6]
    at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35) ~[spring-aop-5.3.6.jar:5.3.6]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:676) ~[spring-aop-5.3.6.jar:5.3.6]
    at org.springframework.batch.item.data.MongoItemReader$$EnhancerBySpringCGLIB$$67443e4.read(<generated>) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:99) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:180) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:126) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:118) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.3.6.jar:5.3.6]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: java.lang.IllegalStateException: No context holder available for job scope
    at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:159) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:371) ~[spring-beans-5.3.6.jar:5.3.6]

我尝试解决这个问题,例如: https://github.com/spring-projects/spring-batch/issues/1335,但似乎除了主线程之外它只使用了一个线程。

有没有办法在不添加调整代码的情况下解决这个问题?

我打算在 Kubernetes 上使用远程分区来扩展作业,这个问题会因为作业范围而持续存在吗?

欢迎提出任何想法或建议。

【问题讨论】:

    标签: multithreading spring-boot spring-batch


    【解决方案1】:

    我正在尝试使用 JobExecutionContext 在步骤之间共享数据,这仅在我保持步骤单线程时才有效

    依赖执行上下文在多线程步骤之间共享数据是不正确的,因为键会被并发线程覆盖。参考文档明确提到要在多线程环境中关闭状态管理:

    • Javadoc: remember to use saveState=false if used in a multi-threaded client
    • Reference doc: it is not recommended to use job-scoped beans in multi-threaded or partitioned steps

    也就是说,我看不到从多线程步骤到下一步可以共享哪些密钥(因为线程是并行执行的),但是如果您确实需要这样做,则应该使用另一种方法,例如定义一个线程安全的共享 bean。

    【讨论】:

    • 谢谢,但是如何定义线程安全的共享bean呢?
    • 例如,通过创建一个包含线程安全数据结构的类(例如 ConcurrentHashMap)并将其实例定义为 bean。
    • 这实际上是我所做的,除了我还有几个问题: 1- 我可以在 step 方法中使用一个变量,比如一个变量来动态定义块大小吗? 2- 使用多线程步骤会影响可重启性吗?
    • 1.是的,您通过将 step 工作范围设置为正确的做法,以便该 step bean 的创建被延迟并且仅在运行时完成。 2. 这取决于:可能发生的问题是并发线程在执行上下文中相互覆盖重启数据(read.count、write.count 等),但如果你设法防止这种情况发生,那么你可以保持可重启性。
    • 你有什么建议来保持可重启性,因为这对我的用例至关重要。
    猜你喜欢
    • 2021-11-13
    • 2021-02-03
    • 2020-11-29
    • 1970-01-01
    • 2019-01-04
    • 1970-01-01
    • 2020-12-20
    • 1970-01-01
    • 2020-07-23
    相关资源
    最近更新 更多