【问题标题】:Why ThreadPoolExecutor has BlockingQueue as its argument?为什么 ThreadPoolExecutor 有 BlockingQueue 作为它的参数?
【发布时间】:2011-11-25 06:26:19
【问题描述】:

我已尝试使用

创建和执行 ThreadPoolExecutor
int poolSize = 2;
int maxPoolSize = 3;
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);

如果我连续尝试第 7、8 个...任务

  threadPool.execute(task);  

队列达到最大大小后
开始抛出“RejectedExecutionException”。意味着我失去了添加这些任务。

如果 BlockingQueue 缺少任务,那么它的作用是什么?意味着为什么它不等待?

从BlockingQueue的定义

另外支持等待队列的操作的队列 检索元素时变为非空,并等待空间 存储元素时在队列中可用。


为什么我们不能使用链表(普通队列实现而不是阻塞队列)?

【问题讨论】:

    标签: java multithreading


    【解决方案1】:

    您没有按照预期的方式使用 BlockingQueue。

    BlockingQueue 用于实现producer consumer pattern。生产者线程通过阻塞put() 方法将项目放入队列,而消费者线程通过阻塞take() 方法从队列中取出项目。

    put() 阻塞 - 表示如果队列已满,它会等到消费者从队列中取出一个项目后再添加它,然后返回。

    take() 阻塞 - 表示如果队列为空,它会等到生产者将一个项目放入队列,然后再取出并返回它。

    这种模式将生产者与消费者完全断开,除了他们共享队列。

    尝试像这样使用队列:让执行程序运行一些充当生产者的线程和一些充当消费者的线程。

    【讨论】:

    • 我不知道你为什么认为他没有按照预期的方式使用BlockingQueue,除非你认为Java并发库的设计者没有以这种方式使用队列它旨在被使用。我可能会严重误解您的回答,但我很确定阻塞队列的目的是像 ThreadPoolExecutor 那样使用。
    • 当然,但他没有实现消费者端或将工作放在队列中 - 他只是提交任务。当然,队列会填满 - 没有什么可以清空它。
    • TheradPoolExecutor 上调用execute(即提交任务)会将任务放入阻塞队列,一旦执行器的线程可用,它将从阻塞队列中取出任务并执行它。如果任务不是太长并且线程池中有足够的线程,那么队列最终会被清空。生产者/消费者设计模式在ThreadPoolExecutor 本身内部实现。
    • 同意 Lirik,我认为生产者消费者模式必须由 ThreadPoolExecutor API 处理,因为它负责将任务添加到阻塞队列中。如果队列已满,它应该等到队列被清空,然后它应该添加任务。但是不是等待而是抛出无法添加任务的异常。所以 ThreadPoolExecutor 在使用阻塞队列时违反了生产者/消费者的设计。请澄清一下。
    • @Bohemian 您的观点是人们期望 Executor+BlockingQueue 的工作方式。但实际上它不是这样工作的,这就是提出这个问题的原因。
    【解决方案2】:

    出现问题是因为您的任务队列太小,execute 方法的文档说明了这一点:

    在未来的某个时间执行给定的任务。该任务可以在新线程或现有池线程中执行。如果任务无法提交执行,要么是因为这个执行器已经关闭,要么是因为它的容量已经达到,任务由当前的 RejectedExecutionHandler 处理。

    所以第一个问题是您将队列大小设置为非常小的数字:

    int poolSize = 2;
    int maxPoolSize = 3;
    ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);
    

    然后您声明“如果 [我] 尝试第 7、第 8... 个任务”,那么您将收到 RejectedExecutionException,因为您已经超出了队列的容量。有两种方法可以解决您的问题(我建议两者都做):

    1. 增加队列的大小。
    2. 捕获异常并重新尝试添加任务。

    你应该有这样的东西:

    public void ExecuteTask(MyRunnableTask task) {
        bool taskAdded = false;
        while(!taskAdded) {
            try {
                executor.execute(task);
                taskAdded = true;
            } catch (RejectedExecutionException ex) {
                taskAdded = false;
            }
        }   
    }
    

    现在,解决您的其他问题...

    如果 BlockingQueue 缺少任务,那么它的作用是什么?

    BlockingQueue 的作用是完成生产者/消费者模式,如果它足够大,那么您应该看不到您遇到的问题。正如我上面提到的,您需要增加队列大小并捕获异常,然后重试执行任务。

    为什么我们不能使用链表?

    链表既不是线程安全的,也不是阻塞的。生产者/消费者模式往往最适合使用阻塞队列。

    更新

    请不要被以下陈述冒犯,我故意使用更严格的语言来强调您的第一个假设永远不应该是有问题您正在使用的库(除非您自己编写了该库并且您知道其中存在特定问题)!

    因此,让我们现在解决这个问题:ThreadPoolExecutor 和 Java 库都不是这里的问题。完全是您(错误)使用库导致了问题。 Javmex 有一个 great tutorial 解释您所看到的确切情况。

    填满队列的速度比清空队列的速度快可能有以下几个原因:

    1. 添加任务执行的线程添加任务太快。
    2. 任务执行时间过长。
    3. 您的队列太小了。
    4. 以上 3 项的任意组合。

    还有很多其他原因,但我认为以上是最常见的。

    我会给你一个simple solution with an unbounded queue,但它不会解决你对图书馆的(错误)使用。 因此,在我们责怪 Java 库之前,让我们看一个简明的示例来演示您遇到的确切问题。

    更新 2.0

    以下是针对特定问题的其他几个问题:

    1. ThreadPoolExecutor Block When Queue Is Full?
    2. How to make ThreadPoolExecutor's submit() method block if it is saturated?

    【讨论】:

    • 我相信生产者消费者模式必须由 ThreadPoolExecutor API 处理,因为它负责将任务添加到阻塞队列中。如果队列已满,它应该等到队列被清空,然后它应该添加任务。但是不是等待而是抛出无法添加任务的异常。所以 ThreadPoolExecutor 在使用阻塞队列时违反了生产者/消费者的设计。请澄清一下。
    • 即使我将队列大小增加到大不;当提交的任务超过此 szie && 队列已满时;我相信它会失去任务。那么我该如何避免呢?我需要去无限阻塞队列吗?
    • @Kanagavelu,一种解决方案是捕获异常并重新添加任务。但是,如果没有看到可以演示该问题的代码示例,就很难再发表评论了。此外,每个任务运行多长时间?有多少线程正在添加执行任务?这些任务添加执行的频率如何?
    • 问题只是为了理解他们为什么选择阻塞队列而不是唯一队列。我只是运行一个简单的示例来确保当队列已满时 API 是否正在等待添加其他任务。然后我发现不是。我认为这被称为 [link]en.wikipedia.org/wiki/Producer-consumer_problem" Producer-consumer_problem 发生在这里。
    • @Kanagavelu,我不是 100% 确定他们为什么在队列已满时不直接阻止 put,但我很确定他们会阻止 take。阻塞take 确保线程不必使用任何类型的轮询来检查队列是否有数据(这被认为是一种低效的方法),而是工作线程等待/阻塞,直到它发出信号表明队列中有东西排队,以便它可以出队。他们使用阻塞队列的另一个原因是因为它是线程安全的,因此在调用 put/take 时不需要用户同步。
    【解决方案3】:

    阻塞队列主要针对消费者(池中的线程)。线程可以等待队列中的新任务可用,它们将被自动唤醒。一个普通的链表不能达到这个目的。

    在生产者端,默认行为是在队列已满时抛出异常。这可以通过实现您自己的RejectedExceptionHandler 轻松定制。在您的处理程序中,您可以获取队列并调用 put 方法,该方法将阻塞直到有更多可用空间。

    但这不是一件好事 - 原因是如果这个执行器出现问题(死锁,处理缓慢),它将对系统的其余部分产生连锁反应。例如,如果您从 servlet 调用 execute 方法 - 如果 execute 方法阻塞,则所有容器线程都将被阻止,您的应用程序将停止。这可能是默认行为是抛出异常而不是等待的原因。也没有 RejectedExceptionHandler 的实现可以做到这一点 - 阻止人们使用它。

    有一个选项 (CallersRunPolicy) 可以在调用线程中执行,如果您希望处理发生,可以是另一个选项。

    一般规则是 - 最好让一个请求的处理失败,而不是让整个系统停机。您可能想了解circuit-breaker 模式。

    【讨论】:

    • 这很有帮助,尤其是您使用 Servlet 的示例。据我了解,消费者-生产者模式的这种实现是为了增加灵活性,这使得这个执行器成为通用工具,但是在这种流行的情况下,当你需要同时阻止消费者和生产者时,你必须编写一些代码(经典的 Java )
    • 值得注意的是,BlockingQueue 的默认行为是在队列已满时阻塞放置,但是当使用 ThreadPoolExecutor 时,它默认不使用此阻塞功能并通过抛出快速失败RejectedExecutionException 代替。
    【解决方案4】:

    你可以找到BlockingQueue的用途here

    /**
         * The queue used for holding tasks and handing off to worker
         * threads.  We do not require that workQueue.poll() returning
         * null necessarily means that workQueue.isEmpty(), so rely
         * solely on isEmpty to see if the queue is empty (which we must
         * do for example when deciding whether to transition from
         * SHUTDOWN to TIDYING).  This accommodates special-purpose
         * queues such as DelayQueues for which poll() is allowed to
         * return null even if it may later return non-null when delays
         * expire.
         */
        private final BlockingQueue<Runnable> workQueue;
    

    BlockingQueue 有助于为生产者消费者问题提供良好的解决方案。您可以在下面的示例中找到它的用法:

    Producer/Consumer threads using a Queue

    关于RejectedExecutionException,您没有为BlockingQueue 设置正确的大小。

    【讨论】:

      【解决方案5】:

      根据您的设置,如果并发超过 5 个,则请求可能会被拒绝。因为顺序将是池大小 -> 队列 -> 最大池大小。

      1. 将创建初始两个线程
      2. 之后,如果有超过 2 个请求到来,子序列请求将被放入队列。
      3. 如果队列已满(设置为 2),将创建新线程但不会超过最大池大小(设置为 3)。
      4. 如果有更多请求,并且所有线程/工作者都忙且队列已满,则请求将被拒绝(取决于您的拒绝策略配置)。

      更多细节可以从这里阅读:https://dzone.com/articles/scalable-java-thread-pool-executor

      【讨论】:

        【解决方案6】:

        问题是因为你的任务队列太小了

        恕我直言,这并不能回答 OP 问题(尽管 Kiril 在那之后提供了更多细节),因为队列的大小 太小 完全是主观的。例如,他可能正在保护一个无法处理两个以上并发请求的外部资源(除此之外,我认为 2 是为了进行快速测试)。另外,我们可以说多大的尺寸不算小? 1000?如果执行尝试执行 5000 个任务怎么办?场景仍然存在,因为真正的问题是如果 ThreadPoolExecutor 使用 LinkedBlockingQueue,为什么生产者或调用者线程没有被阻塞?

        BlockingQueue 的作用是完成生产者/消费者模式,如果它足够大,那么你应该看不到你遇到的问题。正如我上面提到的,您需要增加队列大小并捕获异常,然后重试执行任务。

        如果您知道可以在运行时排队的最大任务数,并且可以为此分配足够的堆,则这是正确的,但情况并非总是如此。使用有界队列的好处是您无法保护自己免受OutOfMemoryError 的侵害。

        正确答案应该是来自@gkamal 的答案。

        阻塞队列主要用于消费者(池中的线程)。线程可以等待队列中的新任务可用,它们将被自动唤醒。一个普通的链表不能达到这个目的。

        我的两分钱:

        开发人员可能已经决定消费者不会阻塞,在这种情况下将不再需要 BlockingQueue(尽管您仍然需要使用并发集合或手动处理同步),但在这种情况下,当工作人员尝试 轮询你必须从一个空集合中的一个任务

        1. 杀死线程(处理新任务创建新线程),或
        2. 忙着等待浪费cpu

        由于创建新线程是一项昂贵的任务(毕竟这是我们使用线程池的方式)并且会忙于等待,因此最好的选择是阻止该消费者并稍后在任务可用时通知。

        实际上,有一种特殊情况,当线程数超过配置的corePoolSize 时,worker 不会阻塞(至少不是无限期地)。

         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
        

        我们可以在 ThreadPoolExecutor (java 1.8) 中看到这两种场景

        /**
             * Performs blocking or timed wait for a task, depending on
             * current configuration settings, or returns null if this worker
             * must exit because of any of:
             * 1. There are more than maximumPoolSize workers (due to
             *    a call to setMaximumPoolSize).
             * 2. The pool is stopped.
             * 3. The pool is shutdown and the queue is empty.
             * 4. This worker timed out waiting for a task, and timed-out
             *    workers are subject to termination (that is,
             *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
             *    both before and after the timed wait, and if the queue is
             *    non-empty, this worker is not the last thread in the pool.
             *
             * @return task, or null if the worker must exit, in which case
             *         workerCount is decremented
             */
            private Runnable getTask() {
                boolean timedOut = false; // Did the last poll() time out?
        
                for (;;) {
                    int c = ctl.get();
                    int rs = runStateOf(c);
        
                    // Check if queue empty only if necessary.
                    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                        decrementWorkerCount();
                        return null;
                    }
        
                    int wc = workerCountOf(c);
        
                    // Are workers subject to culling?
                    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
                    if ((wc > maximumPoolSize || (timed && timedOut))
                        && (wc > 1 || workQueue.isEmpty())) {
                        if (compareAndDecrementWorkerCount(c))
                            return null;
                        continue;
                    }
        
                    try {
                        Runnable r = timed ?
                            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // <---- wait at most keepAliveTime
                            workQueue.take(); // <---- if there are no tasks, it awaits on notEmpty.await();
                        if (r != null)
                            return r;
                        timedOut = true;
                    } catch (InterruptedException retry) {
                        timedOut = false;
                    }
                }
            }
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2019-08-21
          • 1970-01-01
          • 2023-03-16
          • 2022-10-17
          • 2016-02-18
          • 1970-01-01
          • 2021-11-15
          • 1970-01-01
          相关资源
          最近更新 更多