【问题标题】:Partitioned Job can't stop by itself after finishing? Spring Batch分区作业完成后无法自行停止?春季批次
【发布时间】:2016-03-01 02:35:28
【问题描述】:

我编写了一个包含两个步骤的作业,其中两个步骤之一是分区步骤。 分区步骤使用 TaskExecutorPartitionHandler 并在线程中运行 5 个从属步骤。 该作业在 main() 方法中启动。但它并没有在每个奴隶 ItemReader 返回 null 后停止 - 完成符号。即使在程序运行通过 main() 方法中的最后一行代码(即 System.out.println("Finished"))之后,程序进程也不会停止,挂在内存中并且什么也不做。我必须按下 Eclipse 面板上的停止按钮来停止程序。

以下是 JobLauncher.run() 返回的 JobExecution 的内容,表示 Job 运行成功状态..

JobExecution: id=0, version=2, startTime=Fri Nov 27 06:05:23 CST 2015, endTime=Fri Nov 27 06:05:39 CST 2015, lastUpdated=Fri Nov 27 06:05:39 CST 2015, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, version=0, Job=[jobCensoredPages]], jobParameters=[{}]

7217
完成

为什么一个成功运行 Job 的 Spring Batch 程序仍然挂起? 请指出我在哪里解决。我怀疑 Spring Batch 管理的多线程部分不会停止..

简单的作业运行代码

        Job job = (Job) context.getBean("jobPages");
        try {
            JobParameters p=new JobParametersBuilder()
                .toJobParameters();
            
            JobExecution result = launcher.run(job, new JobParameters());
         
            System.out.println(result.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
        context.getBean("idSet");
        AtomicInteger n=(AtomicInteger) context.getBean("pageCount");
        System.out.println(n.get());        
        System.out.println("Finished");

Patitioner 和 PatitionHandler 的配置

    @Bean @Autowired 
    public PartitionHandler beanPartitionHandler(
        TaskExecutor beanTaskExecutor, 
        @Qualifier("beanStepSlave") Step beanStepSlave
        ) throws Exception
    {
        TaskExecutorPartitionHandler h=new TaskExecutorPartitionHandler();
        h.setGridSize(5);
        h.setTaskExecutor(beanTaskExecutor);
        h.setStep(beanStepSlave);   
        h.afterPropertiesSet(); 
        return h;
    }
    @Bean public TaskExecutor beanTaskExecutor() {
        ThreadPoolTaskExecutor e = new ThreadPoolTaskExecutor();
        e.setMaxPoolSize(5);
        e.setCorePoolSize(5);
        e.afterPropertiesSet();
        return e;
    }

唯一的步骤,它是从属步骤

@Bean public Step beanStepMaster(
        Step beanStepSlave,
        Partitioner beanPartitioner,
        PartitionHandler beanPartitionHandler
        )   throws Exception 
    {
        return stepBuilderFactory().get("stepMaster")
        .partitioner(beanStepSlave)
        .partitioner("stepSlave", beanPartitioner)
        .partitionHandler(partitionHandler)
        .build();
    }
    @Bean @Autowired 
    public Step beanStepSlave(
        ItemReader<String> beanReaderTest,
        ItemProcessor<String, String> beanProcessorTest,
        ItemWriter<String> beanWriterTest) throws Exception{
        return stepBuilderFactory().get("stepSlave")
            .<String, String>chunk(1)
            .reader(beanReaderTest)
            .processor(beanProcessorTest)
            .writer(beanWriterTest)
            .build();
    }

我的 pom.xml 文件

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>RELEASE</version>
      <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>4.2.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>4.2.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-tx</artifactId>
        <version>4.2.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-core</artifactId>
        <version>RELEASE</version>      
    </dependency>
    <dependency>
        <groupId>org.springframework.retry</groupId>
        <artifactId>spring-retry</artifactId>
        <version>1.1.2.RELEASE</version>
    </dependency>
    
<dependency>    
    <groupId>org.springframework</groupId>
    <artifactId>spring-beans</artifactId>
    <version>RELEASE</version>
</dependency>

【问题讨论】:

  • 您好,欢迎来到 Stack Overflow。首先,您应该阅读stackoverflow.com/help/how-to-ask 的提问指南以寻求帮助。如果您在遇到错误时将代码(使用编辑器中的代码格式化选项)包含在 cmets 中,您的问题将获得更多帮助。人们将更容易理解您的问题并提供帮助。
  • 我只是希望那些熟悉 Spring Batch 内部结构或解决了相同问题的人能够对我的问题有所了解。我可以提供您进行分析所需的一切..
  • 与 Spring Batch 框架相关。
  • SpringBatch 没有挂起,它完成了。但是由于某种原因,spring 上下文没有关闭。你在使用 SpringBoot 吗?和/或您是否有任何其他活动的弹簧模块,例如 web 模块?还是执行器?这将有助于了解您的 pom 中有哪些依赖项;你能把它们贴出来吗?此外,在您收到“已完成”消息后的线程转储可以提供有意义的见解。
  • +Hansjoerg Wingeier 感谢您提供非常有用的见解。我没有使用 SpringBoot,只是在 helloworld 命令行程序中使用 SpringBatch。我在 pom.ini 中的版本标记中添加了“RELEASE”,因此 Spring Framework 和 SpringBatch 的版本应该是最新的。目前我怀疑挂起是由误用的 Partitioner 或 PartitionHandler 或 TaskExecutor 引起的。它们是如此令人困惑,因为关于如何使用它们的例子很少。我不知道分区步骤如何处理线程,因此线程转储似乎不太可能。但在日志文件中,5 个设计线程中只有一个似乎工作

标签: java spring spring-batch partition


【解决方案1】:

当我使用 ThreadPoolTask​​Executor 时,我的分区 Spring 批处理应用程序在完成时挂起也遇到了困难。另外,我看到执行器不允许所有分区的工作完成。

我找到了两种解决这些问题的方法。

第一个解决方案是使用 SimpleAsyncTaskExecutor 而不是 ThreadPoolTask​​Executor。如果您不介意重新创建线程的额外开销,这是一个简单的解决方法。

第二种解决方案是在 ThreadPoolTask​​Executor 上创建一个调用 shutdown 的 JobExecutionListener。

我这样创建了一个 JobExecutionListener:

@Bean
public JobExecutionListener jobExecutionListener(ThreadPoolTaskExecutor executor) {
    return new JobExecutionListener() {
        private ThreadPoolTaskExecutor taskExecutor = executor;
        @Override
        public void beforeJob(JobExecution jobExecution) {

        }

        @Override
        public void afterJob(JobExecution jobExecution) {
            taskExecutor.shutdown();
        }
    };
}

并将其添加到我的作业定义中,如下所示:

@Bean
public Job partitionedJob(){
    return jobBuilders.get("partitionedJob")
            .listener(jobExecutionListener(taskExecutor()))
            .start(partitionedStep())
            .build();
}

【讨论】:

  • System.exit() 解决方案有效,但看起来很老套。这是一个更好的答案。
  • 简单总是甜蜜的。!很棒的答案。
【解决方案2】:

以上所有答案都是hack/workaround。 问题中发布的问题的根本原因是 threadPoolTask​​Executor 不共享步骤的生命周期。因此,在销毁步骤/作业上下文时,线程池不会自动销毁,它会永远运行。 将 threadPoolExecutor 带入 stepContext "@StepScope" 应该可以解决问题。春天负责摧毁它。

@Bean @StepScope public ThreadPoolTaskExecutor threadPoolTaskExecutor() {

【讨论】:

    【解决方案3】:

    您的问题有两种解决方案,虽然我不知道原因。

    首先,您可以使用CommandLineJobRunner 来启动Job。请参阅文档here。此类在作业结束时自动退出程序并将 ExitStatus 转换为返回码(COMPLETED = 0、FAILED = 1...)。默认返回码由SimpleJvmExitCodeMapper 提供。

    第二种解决方案是在您的JobLauncher.run() 之后手动调用System.exit() 指令。您也可以手动转换JobExitStatus 并在手动退出时使用它:

    // Create Job
    JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
    Job job = (Job) context.getBean(jobName);
    
    // Create return codes mapper
    SimpleJvmExitCodeMapper mapper = new SimpleJvmExitCodeMapper();
    
    // Start Job
    JobExecution execution = jobLauncher.run(job, new JobParameters());
    
    // Close context
    context.close();
    
    // Map codes and exit
    String status = execution.getExitStatus().getExitCode();
    Integer returnCode = mapper.intValue(status);
    System.exit(returnCode);
    

    【讨论】:

    • 谢谢。你说的对。添加 System.exit() 最后确实退出程序。这告诉我问题可能出在分区步骤中。我不知道究竟是什么原因导致 SpringBatch 挂起...我希望文档中有更多关于 Partition Step 行为的示例,特别是关于如何使用 Paritioner、paritionHandler、TaskExecutor 进行线程处理。我只在网上找到了一个,在 Pro SpringBatch 一书中找到了另一个。。谢谢。。你的代码帮助解决了这个问题。
    • // 创建返回码映射器 SimpleJvmExitCodeMapper mapper = new SimpleJvmExitCodeMapper(); if (jobExecution.getStatus() == BatchStatus.COMPLETED) { LOGGER.info("作业完成。"); } else if (jobExecution.getStatus() == BatchStatus.FAILED) { LOGGER.error("******Job failed..."); } 字符串状态 = jobExecution.getExitStatus().getExitCode();整数 returnCode = mapper.intValue(status); System.exit(returnCode);
    猜你喜欢
    • 2020-04-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-05-12
    • 1970-01-01
    • 1970-01-01
    • 2019-06-30
    • 1970-01-01
    相关资源
    最近更新 更多