【问题标题】:ThreadPoolExecutor and the queueThreadPoolExecutor 和队列
【发布时间】:2012-08-05 21:12:40
【问题描述】:

我认为使用ThreadPoolExecutor 我们可以提交Runnables 以在构造函数中传递的BlockingQueue 或使用execute 方法中执行。
我的理解是,如果有任务可用,它将被执行。
我不明白的是:

public class MyThreadPoolExecutor {  

    private static ThreadPoolExecutor executor;  

    public MyThreadPoolExecutor(int min, int max, int idleTime, BlockingQueue<Runnable> queue){  
        executor = new ThreadPoolExecutor(min, max, 10, TimeUnit.MINUTES, queue);   
        //executor.prestartAllCoreThreads();  
    }  

    public static void main(String[] main){
        BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
        final String[] names = {"A","B","C","D","E","F"};  
        for(int i = 0; i < names.length; i++){  
            final int j = i;  
            q.add(new Runnable() {  

                @Override  
                public void run() {  
                    System.out.println("Hi "+ names[j]);  

                }  
            });         
        }  
        new MyThreadPoolExecutor(10, 20, 1, q);   
        try {  
            TimeUnit.SECONDS.sleep(5);  
        } catch (InterruptedException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }  
        /*executor.execute(new Runnable() {  

            @Override  
            public void run() {  

                System.out.println("++++++++++++++");  

            }   
        });  */
        for(int i = 0; i < 100; i++){  
            final int j = i;  
            q.add(new Runnable() {   

                @Override  
                public void run() {  
                    System.out.println("Hi "+ j);  

                }  
            });  
        }   


    }  


}

除非我在构造函数中取消注释executor.prestartAllCoreThreads();,或者我调用打印System.out.println("++++++++++++++"); 的runnable 的execute(它也被注释掉),否则这段代码不会做任何事情。

为什么?
引用(我的重点):

默认情况下,即使是核心线程也是最初创建和仅启动 当新任务到达时,但这可以使用动态覆盖 方法 prestartCoreThread() 或 prestartAllCoreThreads()。你可能 如果您使用非空构造池,则希望预启动线程 队列。

好的。所以我的队列不是空的。但是我创建了executor,我做了sleep,然后我将新的Runnables 添加到队列中(在循环中到100)。
这个循环不算new tasks arrive吗?
为什么它不起作用,我必须prestart 或显式调用execute

【问题讨论】:

  • 工作线程在execute 到达任务时产生,这些线程与底层工作队列交互。如果您从非空工作队列开始,则需要预先启动工作人员。请参阅implementation in OpenJDK 7
  • @veer:是的,我知道。但是我从一个非空队列开始,然后开始添加到队列中。为什么我需要预先启动工人?为什么我每次都需要查看实施?规范是错误的,或者我无法理解它。那么它是什么?
  • 同样,工人是直接与队列交互的人。它们仅在通过execute(或其上方的层,例如invokeAllsubmit 等)传递时按需生成。见executehere的代码。
  • @veer:当我预启动工作人员时,代码无需调用execute即可运行
  • 是的,那是因为工作人员已经启动,因此他们能够检查队列中的可用工作

标签: java multithreading concurrency java.util.concurrent threadpoolexecutor


【解决方案1】:

工作线程在任务通过执行到达时产生,这些线程与底层工作队列交互。如果您从非空工作队列开始,则需要预先启动工作人员。请参阅implementation in OpenJDK 7

我再说一遍,工人是与工作队列交互的人。它们仅在通过 execute 传递时按需生成。 (或它上面的层,例如invokeAllsubmit 等)如果它们没有启动,那么添加到队列中的工作量无关紧要,因为没有任何检查它,因为 有没有工人开始

ThreadPoolExecutor 不会生成工作线程,除非有必要,或者如果您通过方法 prestartAllCoreThreadsprestartCoreThread 抢先创建它们。如果没有工作人员启动,那么队列中的任何工作都无法完成。

添加初始execute 起作用的原因是它强制创建sole 核心工作线程,然后它可以开始处理队列中的工作。您也可以致电 prestartCoreThread 并收到类似的行为。如果您想启动所有工人,您必须致电prestartAllCoreThreads 或通过execute 提交该数量的任务。

查看下方execute 的代码。

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

【讨论】:

  • 您没有回答关于规范的 OP。您通过输入 JVM 实现的 one 的实现细节来回答(对于 Oracle 或 IBM 等可能不同)
  • Worker threads are spawned as tasks arrive by execute。此外,如果工作线程已启动,我可以直接将任务添加到队列中,而无需调用 execute
  • 我非常有意识地将一个实现的实现细节(实际上与 Oracle JDK 中的那个相同)。 OP 是代码如此工作的原因,我对此进行了解释。 ThreadPoolExecutor 不会生成工作线程,除非有必要,或者如果您通过 prestartAllCoreThreadsprestartCoreThread 方法抢先创建它们。
  • 如果您在调用execute 一次后将工作添加到队列并且它可以工作,那么您已经成功生成了一个单独单个工作线程> 正在处理工作队列。
  • 只要您有工作线程等待消耗工作,您就可以通过队列进行交互,因此文档建议您为非空工作队列预先启动它们。通过调用execute 一次“启动”执行程序只会启动一个工作人员,即不是整个池。
【解决方案2】:

BlockingQueue 不是神奇的线程调度程序。如果你将 Runnable 对象提交到队列中,并且没有正在运行的线程来消费这些任务,那么它们当然不会被执行。另一方面,如果需要,execute 方法会根据线程池配置自动调度线程。如果你预先启动了所有的核心线程,那里就会有线程来消费队列中的任务。

【讨论】:

    猜你喜欢
    • 2018-05-16
    • 1970-01-01
    • 2011-03-27
    • 1970-01-01
    • 1970-01-01
    • 2018-06-24
    • 2023-03-16
    • 2012-03-26
    • 1970-01-01
    相关资源
    最近更新 更多