【问题标题】:ThreadPoolExecutor fixed thread pool with custom behaviourThreadPoolExecutor 修复了具有自定义行为的线程池
【发布时间】:2012-10-28 13:24:08
【问题描述】:

我是这个主题的新手......我正在使用用 Executors.newFixedThreadPool( 10 ) 创建的 ThreadPoolExecutor ,在池已满后,我开始收到 RejectedExecutionException 。 有没有办法“强制”执行者将新任务置于“等待”状态,而不是拒绝它并在池被释放时启动它?

谢谢

关于此的问题 https://github.com/evilsocket/dsploit/issues/159

涉及的代码行https://github.com/evilsocket/dsploit/blob/master/src/it/evilsocket/dsploit/net/NetworkDiscovery.java#L150

【问题讨论】:

  • 你找到解决这个问题的方法了吗?

标签: java multithreading threadpool threadpoolexecutor


【解决方案1】:

如果您使用Executors.newFixedThreadPool(10);,它会将任务排队并等待线程准备好。

这个方法是

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

如您所见,使用的队列是无限的(这本身可能是一个问题),但这意味着队列永远不会填满,您也永远不会被拒绝。

顺便说一句:如果您有 CPU 密集型任务,则最佳线程数可以是

int processors = Runtime.getRuntime().availableProcessors();
ExecutorService es = Executors.newFixedThreadPool(processors);

一个可以说明情况的测试类

public static void main(String... args) {
    ExecutorService es = Executors.newFixedThreadPool(2);
    for (int i = 0; i < 1000 * 1000; i++)
        es.submit(new SleepOneSecond());

    System.out.println("Queue length " + ((ThreadPoolExecutor) es).getQueue().size());
    es.shutdown();
    System.out.println("After shutdown");
    try {
        es.submit(new SleepOneSecond());
    } catch (Exception e) {
        e.printStackTrace(System.out);
    }
}

static class SleepOneSecond implements Callable<Void> {
    @Override
    public Void call() throws Exception {
        Thread.sleep(1000);
        return null;
    }
}

打印

Queue length 999998
After shutdown
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@e026161 rejected from java.util.concurrent.ThreadPoolExecutor@3e472e76[Shutting down, pool size = 2, active threads = 2, queued tasks = 999998, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2013)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:816)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1337)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:132)
    at Main.main(Main.java:17)

【讨论】:

  • 仍然,我得到了 RejectedExecutionException,你可以在这里看到github.com/evilsocket/dsploit/issues/159
  • @SimoneMargaritelli 使用newFixedThreadPool 时获得RejectedExecutionException 的唯一原因是,如果您在调用shutdown 方法后尝试将任务提交给执行程序。跨度>
  • 鉴于池只运行过 31 个任务,我怀疑它没有满。此外,corePoolSize 为 0,表明它已被关闭,这将触发拒绝。
  • @SimoneMargaritelli BTW 您不需要强制转换为 ThreadPoolExecutor,我会将其保留为接口 ExecutorService。
【解决方案2】:

很有可能一个线程调用exit,将mStopped设置为false并关闭执行器,但是:

  • 您正在运行的线程可能处于while (!mStopped) 循环的中间并尝试向已被exit 关闭的执行程序提交任务
  • while 中的条件返回 true,因为对 mStopped 所做的更改不可见(您不使用任何形式的围绕该标志的同步)。

我建议:

  • 使mStopped易变
  • 处理执行程序在循环中间关闭的情况(例如通过捕获 RejectedExecutionException,或者可能更好:在 while 循环之后关闭执行程序,而不是在退出方法中关闭它)。

【讨论】:

    【解决方案3】:

    根据之前的建议,您可以使用阻塞队列来构造固定大小的ThreadPoolExecutor。如果您随后提供自己的 RejectedExecutionHandler 将任务添加到阻塞队列,它将按照描述的方式运行。

    这是一个如何构建这样一个执行器的示例:

    int corePoolSize    = 10;
    int maximumPoolSize = 10;
    int keepAliveTime   =  0;
    int maxWaitingTasks = 10;
    
    ThreadPoolExecutor blockingThreadPoolExecutor = new ThreadPoolExecutor(
            corePoolSize, maximumPoolSize,
            keepAliveTime, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(maxWaitingTasks),
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        executor.getQueue().put(r);
                    } catch (InterruptedException e) {
                        throw new RuntimeException("Interrupted while submitting task", e);
                    }
                }
            });
    

    【讨论】:

      【解决方案4】:

      如果我理解正确,您的 ThreadPool 使用固定数量的线程创建,但您可能有更多任务要提交到线程池。我会根据请求计算 keepAliveTime 并动态设置它。这样你就不会有 RejectedExecutionException。

      例如

      long keepAliveTime = ((applications.size() * 60) / FIXED_NUM_OF_THREADS) * 1000; threadPoolExecutor.setKeepAliveTime(keepAliveTime, TimeUnit.MILLISECONDS);

      其中应用程序是每次都可能不同的任务集合。

      如果您知道任务的平均时间,那应该可以解决您的问题。

      【讨论】:

        猜你喜欢
        • 2012-04-28
        • 2010-11-05
        • 1970-01-01
        • 1970-01-01
        • 2010-09-27
        • 2014-02-05
        • 1970-01-01
        • 1970-01-01
        • 2012-01-21
        相关资源
        最近更新 更多