【问题标题】:Using Spring @Scheduled and @Async together一起使用 Spring @Scheduled 和 @Async
【发布时间】:2013-01-15 09:29:37
【问题描述】:

这是我的用例。

旧系统更新数据库队列表 QUEUE。

我想要一个定期的定期工作 - 检查 QUEUE 的内容 - 如果表中有行,它会锁定行并做一些工作 - 删除QUEUE中的行

如果之前的作业仍在运行,则将创建一个新线程来完成该工作。我要配置最大并发线程数。

我正在使用 Spring 3,我目前的解决方案是执行以下操作(使用 1 毫秒的固定速率让线程基本连续运行)

@Scheduled(fixedRate = 1)
@Async
public void doSchedule() throws InterruptedException {
    log.debug("Start schedule");
    publishWorker.start();
    log.debug("End schedule");
}

<task:executor id="workerExecutor" pool-size="4" />

这直接创建了 4 个线程,并且线程正确地共享了队列中的工作负载。但是,当线程需要很长时间才能完成时,我似乎遇到了内存泄漏。

java.util.concurrent.ThreadPoolExecutor @ 0xe097b8f0                              |              80 |   373,410,496 |     89.74%
|- java.util.concurrent.LinkedBlockingQueue @ 0xe097b940                          |           48 |   373,410,136 |     89.74%
|  |- java.util.concurrent.LinkedBlockingQueue$Node @ 0xe25c9d68  

所以

1:我应该同时使用@Async 和@Scheduled 吗?

2:如果没有,那我还能如何使用 spring 来实现我的要求?

3:如何在其他线程忙的时候才创建新线程?

谢谢大家!

编辑:我认为工作队列变得无限长......现在使用

    <task:executor id="workerExecutor"
    pool-size="1-4"
    queue-capacity="10" rejection-policy="DISCARD" />

将报告结果

【问题讨论】:

  • 没有@Async 就不能正常工作吗?使用@Scheduled 注释的方法无论如何都应该异步执行。
  • 如果您希望“线程连续运行”,那么您首先不应该真正使用@Scheduled。它用于“预定”活动,而不是连续活动......
  • 你可以考虑制作 publishWorker.start();方法异步。

标签: spring memory scheduled-tasks


【解决方案1】:

你可以试试

  1. 以一秒延迟运行调度程序,这将锁定并获取所有 目前尚未锁定的 QUEUE 记录。
  2. 对于每条记录,调用 Async 方法,该方法将处理该记录并将其删除。
  3. 执行程序的拒绝策略应该是 ABORT,以便调度程序可以解锁尚未发出处理的 QUEUE。这样调度程序就可以在下次运行时再次尝试处理这些 QUEUE。

当然,您必须处理调度程序已锁定 QUEUE 的情况,但无论出于何种原因,处理程序都没有完成处理。

伪代码:

public class QueueScheduler {
    @AutoWired
    private QueueHandler queueHandler;

    @Scheduled(fixedDelay = 1000)
    public void doSchedule() throws InterruptedException {
        log.debug("Start schedule");
        List<Long> queueIds = lockAndFetchAllUnlockedQueues();
        for (long id : queueIds)
            queueHandler.process(id);
        log.debug("End schedule");
    }
}

public class QueueHandler {

    @Async
    public void process(long queueId) {
        // process the QUEUE & delete it from DB
    }
}
<task:executor id="workerExecutor" pool-size="1-4" queue-capcity="10"
     rejection-policy="ABORT"/>

【讨论】:

  • 如果您能以任何方式为lockAndFetchAllUnlockedQueues(); 提供伪代码,那将非常有帮助。
【解决方案2】:
//using a fixedRate of 1 millisecond to get the threads to run basically continuously
@Scheduled(fixedRate = 1)

当您使用@Scheduled 时,将创建一个新线程,并以指定的固定速率在1 毫秒调用doSchedule 方法。当您运行您的应用程序时,您已经可以看到 4 个线程竞争 QUEUE 表,并且可能出现死锁。

通过线程转储调查是否存在死锁。 http://helpx.adobe.com/cq/kb/TakeThreadDump.html

@Async 注解在这里没有任何用处。

实现这一点的更好方法是通过实现 runnable 并将您的类传递给 TaskExecutor 并使用所需的线程数,将您的类创建为线程。

Using Spring threading and TaskExecutor, how do I know when a thread is finished?

还要检查您的设计,它似乎没有正确处理同步。如果前一个作业正在运行并持有该行的锁,那么您创建的下一个作业仍会看到该行并等待获取该特定行的锁。

【讨论】:

    猜你喜欢
    • 2018-10-31
    • 1970-01-01
    • 2015-08-03
    • 2015-07-26
    • 2020-03-15
    • 2019-07-27
    • 2020-06-06
    • 2022-08-20
    • 1970-01-01
    相关资源
    最近更新 更多