【问题标题】:Is adding tasks to BlockingQueue of ThreadPoolExecutor advisable?是否建议向 ThreadPoolExecutor 的 BlockingQueue 添加任务?
【发布时间】:2011-07-31 22:50:36
【问题描述】:

ThreadPoolExecutor 的 JavaDoc 不清楚是否可以直接将任务添加到支持执行程序的BlockingQueueThe docs say 调用 executor.getQueue() 是“主要用于调试和监控”。

我正在用我自己的BlockingQueue 构建一个ThreadPoolExecutor。我保留了对队列的引用,因此可以直接向其中添加任务。 getQueue() 返回相同的队列,因此我假设 getQueue() 中的警告适用于对通过我的手段获得的后备队列的引用。

示例

代码的一般模式是:

int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
    Runnable job = ...;
    queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
    try {
        Thread.sleep(...);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
executor.shutdownNow();

queue.offer()executor.execute()

据我了解,典型用途是通过executor.execute() 添加任务。上面示例中的方法具有阻塞队列的好处,而如果队列已满并拒绝我的任务,execute() 会立即失败。我也喜欢提交作业与阻塞队列交互;这对我来说感觉更“纯粹”的生产者-消费者。

直接将任务添加到队列的含义:我必须调用prestartAllCoreThreads(),否则没有工作线程正在运行。假设没有与执行程序的其他交互,则不会监控队列(ThreadPoolExecutor 源的检查证实了这一点)。这也意味着对于直接排队,ThreadPoolExecutor 必须另外配置为 > 0 个核心线程,并且不得配置为允许核心线程超时。

tl;博士

给定一个ThreadPoolExecutor,配置如下:

  • 核心线程 > 0
  • 核心线程不允许超时
  • 核心线程已预先启动
  • 保留对支持执行程序的 BlockingQueue 的引用

是否可以直接将任务添加到队列而不是调用executor.execute()

相关

这个问题 (producer/consumer work queues) 类似,但不具体涉及直接添加到队列中。

【问题讨论】:

  • “我保留了对队列的引用,因此我可以直接向其中添加任务”但是您为什么要这样做?为什么不直接将它们提交给执行者?
  • @Raedwald,见上面我写的地方“我上面例子中的方法有利于阻塞队列......”

标签: java concurrency producer-consumer executorservice blockingqueue


【解决方案1】:

如果需要,我们还可以使用一个将主要处理与拒绝任务分开的停车场 -

    final CountDownLatch taskCounter = new CountDownLatch(TASKCOUNT);
    final List<Runnable> taskParking = new LinkedList<Runnable>();
    BlockingQueue<Runnable> taskPool = new ArrayBlockingQueue<Runnable>(1);
    RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.err.println(Thread.currentThread().getName() + " -->rejection reported - adding to parking lot " + r);
            taskCounter.countDown();
            taskParking.add(r);
        }
    };
    ThreadPoolExecutor threadPoolExecutor =  new ThreadPoolExecutor(5, 10, 1000, TimeUnit.SECONDS, taskPool, rejectionHandler);
    for(int i=0 ; i<TASKCOUNT; i++){
        //main 
        threadPoolExecutor.submit(getRandomTask());
    }
    taskCounter.await(TASKCOUNT * 5 , TimeUnit.SECONDS);
    System.out.println("Checking the parking lot..." + taskParking);
    while(taskParking.size() > 0){
        Runnable r = taskParking.remove(0);
        System.out.println("Running from parking lot..." + r);
        if(taskParking.size() > LIMIT){
          waitForSometime(...);
        }
        threadPoolExecutor.submit(r);
    }
    threadPoolExecutor.shutdown();

【讨论】:

    【解决方案2】:

    如果您使用的队列与标准内存中的LinkedBlockingQueueArrayBlockingQueue 完全不同,这是一个非常重要的问题。

    例如,如果您在不同机器上使用多个生产者来实现生产者-消费者模式,并使用基于单独的持久性子系统(如 Redis)的排队机制,那么问题本身就会变得相关,即使您不想像 OP 那样阻塞 offer()

    因此,必须调用prestartAllCoreThreads()(或足够多次prestartCoreThread())才能使工作线程可用和运行,这一给定答案非常重要,值得强调。

    【讨论】:

      【解决方案3】:

      可以通过在实例化时指定RejectedExecutionHandler 来实际配置队列已满时池的行为。 ThreadPoolExecutor 定义了四个策略作为内部类,包括AbortPolicyDiscardOldestPolicyDiscardPolicy,以及我个人最喜欢的CallerRunsPolicy,它在控制线程中运行新作业。

      例如:

      ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
              nproc, // core size
              nproc, // max size
              60, // idle timeout
              TimeUnit.SECONDS,
              new ArrayBlockingQueue<Runnable>(4096, true), // Fairness = true guarantees FIFO
              new ThreadPoolExecutor.CallerRunsPolicy() ); // If we have to reject a task, run it in the calling thread.
      

      问题中所需的行为可以通过以下方式获得:

      public class BlockingPolicy implements RejectedExecutionHandler {
          void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
              executor.getQueue.put(r); // Self contained, no queue reference needed.
          }
      

      在某些时候队列必须被访问。这样做的最佳位置是在一个自包含的RejectedExecutionHandler 中,它可以保存任何代码重复或由于在池对象范围内直接操作队列而产生的潜在错误。请注意,ThreadPoolExecutor 中包含的处理程序本身使用 getQueue()

      【讨论】:

      • 我喜欢你提出另一种选择。我当时的观点是 ThreadPoolExecutor 是一个实现细节。我可以使用阻塞队列为生产者/消费者建模,并在不更改客户端代码的情况下交换工作安排的实现。
      【解决方案4】:

      一个技巧是实现 ArrayBlockingQueue 的自定义子类并覆盖 offer() 方法来调用您的阻塞版本,然后您仍然可以使用正常的代码路径。

      queue = new ArrayBlockingQueue<Runnable>(queueSize) {
        @Override public boolean offer(Runnable runnable) {
          try {
            return offer(runnable, 1, TimeUnit.HOURS);
          } catch(InterruptedException e) {
            // return interrupt status to caller
            Thread.currentThread().interrupt();
          }
          return false;
        }
      };
      

      (您可能已经猜到了,我认为直接在队列中调用 offer 作为您的正常代码路径可能是个坏主意)。

      【讨论】:

        【解决方案5】:

        如果是我,我更喜欢使用Executor#execute() 而不是Queue#offer(),因为我已经在使用java.util.concurrent 中的所有其他内容了。

        你的问题很好,引起了我的兴趣,所以我查看了ThreadPoolExecutor#execute()的来源:

        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
                if (runState == RUNNING && workQueue.offer(command)) {
                    if (runState != RUNNING || poolSize == 0)
                        ensureQueuedTaskHandled(command);
                }
                else if (!addIfUnderMaximumPoolSize(command))
                    reject(command); // is shutdown or saturated
            }
        }
        

        我们可以看到 execute 本身在工作队列上调用 offer(),但在必要时执行一些漂亮、美味的池操作之前不会。出于这个原因,我认为最好使用execute();不使用它可能(尽管我不确定)会导致池以非最佳方式运行。但是,我不认为使用 offer()破坏执行程序 - 看起来任务是使用以下(也来自 ThreadPoolExecutor)从队列中拉出的:

        Runnable getTask() {
            for (;;) {
                try {
                    int state = runState;
                    if (state > SHUTDOWN)
                        return null;
                    Runnable r;
                    if (state == SHUTDOWN)  // Help drain queue
                        r = workQueue.poll();
                    else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                        r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                    else
                        r = workQueue.take();
                    if (r != null)
                        return r;
                    if (workerCanExit()) {
                        if (runState >= SHUTDOWN) // Wake up others
                            interruptIdleWorkers();
                        return null;
                    }
                    // Else retry
                } catch (InterruptedException ie) {
                    // On interruption, re-check runState
                }
            }
        }
        

        这个getTask() 方法只是在一个循环中调用,所以如果执行器没有关闭,它会阻塞,直到一个新任务被分配给队列(不管它来自哪里)。

        注意:尽管我已经从源代码中发布了代码 sn-ps,但我们不能依赖它们来获得明确的答案 - 我们应该只对 API 进行编码。我们不知道execute() 的实现会随着时间的推移而发生怎样的变化。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2020-03-02
          • 2011-11-25
          • 2019-08-21
          • 1970-01-01
          • 2011-12-12
          相关资源
          最近更新 更多