【问题标题】:Looking for solution based on ThreadPoolExecutor to ensure sequential execution of tasks寻找基于ThreadPoolExecutor的解决方案,保证任务的顺序执行
【发布时间】:2015-04-17 04:50:49
【问题描述】:

我有一个包含 m 个线程的线程池。假设 m 是 10 并修复。然后有 n 个队列,有可能 n 变大(例如 100'000 或更多)。每个队列都包含要由这 m 个线程执行的任务。现在,非常重要的是,每个队列都必须按任务顺序处理。这是确保任务按照添加到队列的顺序执行的要求。否则,数据可能会变得不一致(例如,与 JMS 队列相同)。

所以现在的问题是如何确保这 n 个队列中的任务由可用的 m 个线程处理,使得添加到同一个队列的任务不能被不同的线程“同时”执行。

我尝试自己解决这个问题,发现它要求很高。 Java ThreadPoolExecutor 很好,但是您必须添加很多不容易开发的功能。所以问题是,是否有人知道已经解决了这个问题的 Java 框架或系统?

更新

感谢 Adrian 和 Tanmay 的建议。队列的数量可能非常大(例如 100'000 或更多)。所以每个队列一个线程是不可能的,尽管它很简单。我将研究 fork join 框架。看起来是一条有趣的道路。

我当前的第一个迭代解决方案是有一个全局队列,所有任务都添加到该队列中(使用 JDK8 TransferQueue,它的锁定开销很小)。任务被包装到带有队列锁及其大小的队列存根中。队列本身在物理上并不存在,只有它的存根。

空闲线程首先需要获取令牌才能访问全局队列(令牌将是阻塞队列中的单个元素,例如 JDK8 TransferQueue)。然后它对全局队列进行阻塞处理。当获得一个任务时,它会检查该任务的队列存根的队列锁是否关闭。实际上,我认为仅使用 AtomicBoolean 就足够了,并且比锁或同步块创建的锁争用更少。

当获得队列锁后,将token返回到全局队列并执行任务。如果未获得,则将任务添加到 2 级队列,并完成来自全局队列的另一个阻塞获取。线程需要检查第二级队列是否为空,并从中获取任务以执行。

此解决方案似乎有效。但是,每个线程在被允许访问全局队列之前需要获取令牌,而二级队列看起来像是一个瓶颈。我相信它会造成高锁争用。所以,我对此不太满意。也许我从这个解决方案开始并详细说明它。

更新 2

好的,现在这里是我迄今为止提出的“最佳”解决方案。定义了以下队列:

就绪队列(RQ):包含线程池中任何线程可以立即执行的所有任务

Entry Queue (EQ):包含用户想要执行的所有任务以及内部管理任务。 EQ 是一个优先队列。管理任务具有最高优先级。

Channels Queues (CQ):对于每个通道,都有一个内部通道队列,用于保存任务的顺序,例如确保任务按照添加到 EQ 的顺序顺序执行

调度程序:从 EQ 获取任务的专用线程。如果任务是用户任务,则将其添加到任务添加到的通道的 CQ。如果 CQ 的头部等于刚刚插入的用户任务,它也会被添加到 EQ(但仍保留在 CQ 中),以便在线程池的下一个线程可用时立即执行。

如果用户任务已完成执行,则将内部任务 TaskFinished 添加到 RQ。当由调度程序执行时,head 取自关联的 CQ。如果获取后 CQ 不为空,则从 CQ 轮询(但不获取)下一个任务并添加到 RQ。 TaskFinished 任务的优先级高于用户任务。

在我看来,这种方法没有任何逻辑错误。注意 EQ 和 RQ 需要同步。我更喜欢使用来自 JDK8 的 TransferQueue,它非常快,并且在检查它是否为空时,轮询头项也非常快。 CQ 不需要同步,因为它们始终只能由调度程序访问。

到目前为止,我对这个解决方案非常满意。让我想到的是调度程序是否会变成瓶颈。如果 EQ 中的任务比它可以处理的多得多,则 EQ 可能会增加一些积压。对此的任何意见将不胜感激:-)

【问题讨论】:

  • 我记得 Spring 在某处有一些“顺序”关键字。你考虑过JMS吗?这也可能是stackoverflow.com/questions/7192223/… 的副本。编辑 Executors.newSingleThreadExecutor()
  • 等待;如果您希望它们按顺序执行,为什么要首先使用并行性?
  • JMS 不是一个选项,因为它都是 VM 本地的,例如没有理由离开 VM 堆空间并因此而降低性能。我有一个多核处理器。我想确保他们都很忙。这就是并行性发挥作用的原因;-)。
  • 您希望一个队列中的任务按顺序执行,但您不要求不同队列中的任务按特定顺序和/或顺序执行,对吗?
  • @Adrian:是的,这是正确的。

标签: java multithreading scheduling threadpoolexecutor


【解决方案1】:

如果您使用的是 Java 7 或 Java 8,则可以使用 Fork Join Framework

您可以使用每个队列中弹出的第一个元素创建RecursiveTask
请记住将队列的引用提供给相应的RecursiveTasks。

一次调用所有的。 (在循环或流中)。

现在在compute 方法的末尾(完成任务处理后),通过从相应队列中弹出另一个元素并在其上调用invoke 来创建另一个RecursiveTask


注意事项:

  • 每个任务都将负责从队列中提取新元素,因此队列中的所有任务都将按顺序执行。
  • 应该为队列中的每个元素分别创建和调用一个新的RecursiveTask。这可确保某些队列不会占用线程并避免饥饿。
  • 使用 ExecutorService 也是一个可行的选择,但如果对您的用例更友好,IMO ForkJoin 的 API

希望这会有所帮助。

【讨论】:

  • 使用 fork join 框架看起来很有趣。使用 ExecutorService 时,我可以实现一些功能,当所有线程都忙时,将抛出异常,或者在将新任务添加到全局队列时返回 false(请参阅我在最初问题的更新部分中写的内容)。用户知道他的机器已经跪了,它应该将任务发送给其他机器来执行。
  • 如果用户不想等待机器,那么队列就没有意义了。重新考虑您的要求,然后进行设计将是 IMO 的一个好主意。祝你好运。
【解决方案2】:

一个简单的解决方案是在将元素添加到空队列时创建一个任务。该任务将只负责该队列,并在队列结束时结束。确保 Queue 实现是线程安全的,并且在删除最后一个元素后任务停止。

编辑:这些任务应该添加到具有内部队列的 ThreadPoolExecutor 中,例如由 ExecutorService.newFixedThreadPool 创建的一个,它将与有限数量的线程并行处理这些任务。


或者,只需将队列划分为固定数量的线程:

public class QueueWorker implements Runnable {
    // should be unique and < NUM_THREADS:
    int threadId;

    QueueWorker(int threadId) {
        this.threadId = threadId;
    }

    @Override
    public void run() {
        int currentQueueIndex = threadId;
        while (true) {
            Queue currentQueue = queues.get(currentQueue);
            // execute tasks until empty
            currentQueueIndex += NUM_THREADS;
            if (currentQueueIndex > queues.size()) {
                currentQueueIndex = threadId;
            }
        }
    }
}

【讨论】:

  • 感谢 Adrian,我在问题更新部分的 cmets。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-07-24
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多