【发布时间】:2015-04-17 04:50:49
【问题描述】:
我有一个包含 m 个线程的线程池。假设 m 是 10 并修复。然后有 n 个队列,有可能 n 变大(例如 100'000 或更多)。每个队列都包含要由这 m 个线程执行的任务。现在,非常重要的是,每个队列都必须按任务顺序处理。这是确保任务按照添加到队列的顺序执行的要求。否则,数据可能会变得不一致(例如,与 JMS 队列相同)。
所以现在的问题是如何确保这 n 个队列中的任务由可用的 m 个线程处理,使得添加到同一个队列的任务不能被不同的线程“同时”执行。
我尝试自己解决这个问题,发现它要求很高。 Java ThreadPoolExecutor 很好,但是您必须添加很多不容易开发的功能。所以问题是,是否有人知道已经解决了这个问题的 Java 框架或系统?
更新
感谢 Adrian 和 Tanmay 的建议。队列的数量可能非常大(例如 100'000 或更多)。所以每个队列一个线程是不可能的,尽管它很简单。我将研究 fork join 框架。看起来是一条有趣的道路。
我当前的第一个迭代解决方案是有一个全局队列,所有任务都添加到该队列中(使用 JDK8 TransferQueue,它的锁定开销很小)。任务被包装到带有队列锁及其大小的队列存根中。队列本身在物理上并不存在,只有它的存根。
空闲线程首先需要获取令牌才能访问全局队列(令牌将是阻塞队列中的单个元素,例如 JDK8 TransferQueue)。然后它对全局队列进行阻塞处理。当获得一个任务时,它会检查该任务的队列存根的队列锁是否关闭。实际上,我认为仅使用 AtomicBoolean 就足够了,并且比锁或同步块创建的锁争用更少。
当获得队列锁后,将token返回到全局队列并执行任务。如果未获得,则将任务添加到 2 级队列,并完成来自全局队列的另一个阻塞获取。线程需要检查第二级队列是否为空,并从中获取任务以执行。
此解决方案似乎有效。但是,每个线程在被允许访问全局队列之前需要获取令牌,而二级队列看起来像是一个瓶颈。我相信它会造成高锁争用。所以,我对此不太满意。也许我从这个解决方案开始并详细说明它。
更新 2
好的,现在这里是我迄今为止提出的“最佳”解决方案。定义了以下队列:
就绪队列(RQ):包含线程池中任何线程可以立即执行的所有任务
Entry Queue (EQ):包含用户想要执行的所有任务以及内部管理任务。 EQ 是一个优先队列。管理任务具有最高优先级。
Channels Queues (CQ):对于每个通道,都有一个内部通道队列,用于保存任务的顺序,例如确保任务按照添加到 EQ 的顺序顺序执行
调度程序:从 EQ 获取任务的专用线程。如果任务是用户任务,则将其添加到任务添加到的通道的 CQ。如果 CQ 的头部等于刚刚插入的用户任务,它也会被添加到 EQ(但仍保留在 CQ 中),以便在线程池的下一个线程可用时立即执行。
如果用户任务已完成执行,则将内部任务 TaskFinished 添加到 RQ。当由调度程序执行时,head 取自关联的 CQ。如果获取后 CQ 不为空,则从 CQ 轮询(但不获取)下一个任务并添加到 RQ。 TaskFinished 任务的优先级高于用户任务。
在我看来,这种方法没有任何逻辑错误。注意 EQ 和 RQ 需要同步。我更喜欢使用来自 JDK8 的 TransferQueue,它非常快,并且在检查它是否为空时,轮询头项也非常快。 CQ 不需要同步,因为它们始终只能由调度程序访问。
到目前为止,我对这个解决方案非常满意。让我想到的是调度程序是否会变成瓶颈。如果 EQ 中的任务比它可以处理的多得多,则 EQ 可能会增加一些积压。对此的任何意见将不胜感激:-)
【问题讨论】:
-
我记得 Spring 在某处有一些“顺序”关键字。你考虑过JMS吗?这也可能是stackoverflow.com/questions/7192223/… 的副本。编辑 Executors.newSingleThreadExecutor()
-
等待;如果您希望它们按顺序执行,为什么要首先使用并行性?
-
JMS 不是一个选项,因为它都是 VM 本地的,例如没有理由离开 VM 堆空间并因此而降低性能。我有一个多核处理器。我想确保他们都很忙。这就是并行性发挥作用的原因;-)。
-
您希望一个队列中的任务按顺序执行,但您不要求不同队列中的任务按特定顺序和/或顺序执行,对吗?
-
@Adrian:是的,这是正确的。
标签: java multithreading scheduling threadpoolexecutor