【问题标题】:How to get the job id back immediately after setting the concurrency limit设置并发限制后如何立即取回作业ID
【发布时间】:2014-09-30 02:45:06
【问题描述】:

问题是,我设置并发限制后(例如限制为2),当第3个请求进来时,作业启动器不会返回作业执行(作业启动器正在阻塞新的作业执行)。换句话说,我想通过运行“runFun()”方法在收到请求后立即获取作业 ID。

我尝试了 SimpleAsyncTaskExecutor 和 SimpleThreadPoolTask​​Executor,它们都不起作用。并且由于挂起的“jobLauncher.run()”方法,之前的作业监听器也没有作业ID信息。我浏览了源代码(SimpleAsyncTaskExecutor.execute()、ConcurrencyThrottleSupport.beforeAccess()),没有运气解决这个问题。

在这种情况下,当作业启动器运行执行时,如何立即取回作业 ID?

我将主要代码发布如下:

**//Config:**

<bean id="jobLauncher"
      class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository"/>
    <property name="taskExecutor">
        <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor">
            <property name="concurrencyLimit" value="2"/>
        </bean>
    </property>
</bean>

<bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
    <property name="dataSource" ref="sampleservice.persist.datasource"/>
</bean>

<bean id="jobOperator" class="org.springframework.batch.core.launch.support.SimpleJobOperator">
    <property name="jobExplorer" ref="jobExplorer"/>
    <property name="jobRepository" ref="jobRepository"/>
    <property name="jobLauncher" ref="jobLauncher"/>
    <property name="jobRegistry" ref="jobRegistry"/>
</bean>

<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry"></bean>


**//Code:**

    Private Long runFun() {
    new JobParametersBuilder()
                    .addLong("pId", parameterDO.getPId())
                    .toJobParameters();

                try {
                    JobExecution jobExecution = jobLauncher.run(bulkDownloadJob, params);

    //the job execution id will be blocked with status "STARTING" in the batch metadata table until the job is actually started with status "STARTED"
                    return jobExecution.getId();  
    } catch ...

}

经过思考,我正在尝试创建一个新的作业启动器,它会创建一个新线程来专门执行作业并立即返回 jobExecution:

//NewLauncherClass
public JobExecution run(final Job job, final JobParameters jobParameters) {

    ... 

    jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);

        try {
            return jobExecution;
        } finally {
            if (jobExecution != null) {
                new Thread(new Runnable() {
                    public void run() {
                        executeJob(jobExecution, job, jobParameters);
                    }
                }).start();
            } else {
                logger.error("*****");
            }
        }

    }

    //added this method for executing the job
    private void executeJob(final JobExecution jobExecution, final Job job, final JobParameters jobParameters) {
        try {
            logger.info("***** test bulk *****  Job " + jobExecution.getId() + " is going to be executed");

            taskExecutor.execute(new Runnable() {

                ...

【问题讨论】:

    标签: java spring spring-batch


    【解决方案1】:

    您尝试执行的操作不适用于SimpleJobLauncher。问题是TaskExecutor 在使用限制时将阻塞,直到有线程可用于执行任务。问题是您必须有一种方法可以在线程可用于执行它之前返回 JobExecution

    你必须做你正在尝试的消息传递。当前基于消息的组件不会执行此操作,但您可以创建一个 JobLauncher 来创建 JobExecution 并将其发送到 MessageHandler 以执行它。这将允许调用者返回 JobExecution 并且仍然等待执行作业。

    【讨论】:

    • 嗨,迈克尔,非常感谢您的信息。我现在正在研究春季批量集成。你的意思是创建一个自定义的jobLauncher,它可以立即发送回作业执行,然后将其发送给messageHandler执行?如果您能给我更详细的信息,我将不胜感激。如何实现这一点(伪代码就足够了)。
    • 嘿,迈克尔,感谢您提供非常有用的信息。基于这个想法,我按照上面的代码进行了编码以解决问题。基本思想是我创建了一个新线程来执行新作业启动器中的代码。任何 cmets 将不胜感激。
    • 你正在做我建议的同样的事情,你只是手动管理线程。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-05-12
    • 1970-01-01
    • 2017-12-10
    • 1970-01-01
    • 2018-09-11
    相关资源
    最近更新 更多