【问题标题】:How to parallelize a single quartz job with thread pooling?如何使用线程池并行化单个石英作业?
【发布时间】:2019-11-06 12:37:17
【问题描述】:

我的 Spring Boot 应用程序中有一个石英计划作业,当每 5 分钟触发一次时,通过一种方法将 大型列表 中的项目发送到某些 Web 服务。

我想知道我的发送过程(单一发送方法)可以并行化吗? 我想要的是例如当 10000 个项目列表来自 db 并且线程池中的线程将同时工作以发送该列表中的所有记录,并且在发送所有记录后作业将完成。

我在下面的代码中尝试过,我设置了一个 ThreadPoolTaskExecutor 和 5 个线程池大小。 然而,当我执行并检查作业日志时,它说作业在几秒钟内完成,但发送所有数据需要几分钟。它继续正常工作,但工作似乎在几秒钟内完成。它可能说在所有线程设置后工作就完成了。这是我避免的,因为需要知道作业执行时间和日志。

@Autowired
MyService myService;

@NonTransactionalService 
public class MySenderService{

    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(5);
    taskExecutor.setMaxPoolSize(5);
    taskExecutor.initialize();

    public void sendAll(){
        List<Long> largeList = someMethod();
        largeList.stream().forEach(i -> {
            taskExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    myService.send(i);
                }
            });
        }
    }
}

那么,如何在单个作业中使用多个工人运行此发送方法?
否则设置多个是一个好习惯相同的作业 是否通过相同的方法发送相同的列表?

【问题讨论】:

  • 您的工作已完成(从 Quartz 的角度来看),但它现在在后台运行。你什么都没有改进。而是将 yuor largeList 分成 x 个部分(可能是 5 个)并启动 5 个单独的任务,每个任务都在列表的一部分上运行。
  • 是的,当我的sendAll() 方法完成时,它表示工作完成。所以 - 只是好奇 - 如果我在 sendAll 方法完成之前循环 while (taskExecutor.getActiveCount() != 0) 并且当它相等时我让方法完成怎么办?这仍然是一个不好的做法吗?总而言之,我想知道你上面建议的正确方法是什么?感谢您的回答。 @M.Deinum
  • 拆分你的列表,使用CompletableFuture.supplyAsync(&lt;runnable&gt;, taskExecutor);这将返回一个CompletableFuture。将它们收集在一个列表中。准备好所有工作块后,使用CompletableFuture.allOf 加入他们。然后在生成的CompletableFuture 上调用join,这将等到所有这些都完成。不要依赖the active count as Ideally you want to inject the TaskExecutor`,没有什么能阻止你重用它。如果您需要更多控制,我建议您使用 Spring Batch,这使得这种操作很容易。
  • 感谢您的帮助,我会尝试您建议的解决方案。我会不断更新结果。 @M.Deinum

标签: java multithreading quartz-scheduler job-scheduling


【解决方案1】:

您可以尝试使用 Flux,恕我直言,这是需要较少代码的解决方案:

        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(5);
        taskExecutor.initialize();

        List<Integer> largeList = someMethod();
        System.out.println(largeList);
        Flux.fromStream(largeList.stream())
                .parallel(5)
                .runOn(Schedulers.fromExecutor(taskExecutor))
                .subscribe( x -> { System.out.println(x);});

        taskExecutor.shutdown();

不要忘记关闭TaskExecutor,而是作为@M。 Deinum 说,最好让它在别处创建并注入到您的服务中。

【讨论】:

    猜你喜欢
    • 2011-10-18
    • 2010-12-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多