【问题标题】:ThreadPoolExecutor - Changing the pool size even if no threads become idleThreadPoolExecutor - 即使没有线程空闲也改变池大小
【发布时间】:2016-04-10 10:18:48
【问题描述】:

我需要一些帮助来更好地了解 ThreadPoolExecutor 的工作原理。

我有 1000 个任务要处理,这个数字在我的程序开始时是已知的。

我设置了一个 ThreadPoolExecutor 并想动态更改它使用的线程数,以便如果服务器不那么活跃(例如在晚上),我们可以增加它可以使用的线程数。

增加线程数效果很好,我遇到的问题是当我尝试使用 setCorePoolSize 减少线程数时。

据我了解,这个值只有在线程空闲时才会改变。由于不断有任务要处理,因此该线程永远不会空闲,因此永远不会关闭。

那么我应该使用什么来减少线程数?

这是我的演示代码:

package com.test.executortest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
 *
 * @author peter.marcoen
 */
public class ExecutorTest {

    final static Logger log = LogManager.getLogger(ExecutorTest.class);

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor x = new ThreadPoolExecutor(1, 100, 30, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>());
        for (int i = 1; i <= 10; i++) {
            RunnableTest r = new RunnableTest();
            r.num = String.valueOf(i);
            x.execute(r);
        }

        x.shutdown();

        int i = 0;
        while (!x.isTerminated()) {
            i++;
            log.debug("Active count: {}, Core pool size: {}, Maximum pool size: {}, Pool size: {}", x.getActiveCount(), x.getCorePoolSize(), x.getMaximumPoolSize(), x.getPoolSize());
            if (i == 2) {
                log.info("!!!! Setting core pool size to 2 !!!!");
                x.setCorePoolSize(2);    
            } else if (i == 10) {
                log.info("!!!! Setting core pool size to 1 !!!!");
                x.setCorePoolSize(1);
            }
            Thread.sleep(1000);
        }
    }
}

RunnableTest 只休眠 5 秒:

package com.test.executortest;

import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.logging.log4j.LogManager;

/**
 *
 * @author peter.marcoen
 */
public class RunnableTest implements Runnable {
    public String num;
    final static org.apache.logging.log4j.Logger log = LogManager.getLogger(RunnableTest.class);

    @Override
    public void run() {
        log.debug("Started parsing #{}", num);
        try {
            Thread.sleep(5000);
        } catch (InterruptedException ex) {
            Logger.getLogger(RunnableTest.class.getName()).log(Level.SEVERE, null, ex);
        }
        log.debug("Finished parsing #{}", num);
    }

}

这是输出。如您所见,即使将 coreThreads 设置回 1,也始终有 2 个任务正在处理:

11:43:45.696 - Active count: 1, Core pool size: 1, Maximum pool size: 100, Pool size: 1
11:43:45.696 - Started parsing #1
11:43:46.710 - Active count: 1, Core pool size: 1, Maximum pool size: 100, Pool size: 1
11:43:46.710 - !!!! Setting core pool size to 2 !!!!
11:43:46.710 - Started parsing #2
11:43:47.724 - Active count: 2, Core pool size: 2, Maximum pool size: 100, Pool size: 2
11:43:48.738 - Active count: 2, Core pool size: 2, Maximum pool size: 100, Pool size: 2
11:43:49.738 - Active count: 2, Core pool size: 2, Maximum pool size: 100, Pool size: 2
11:43:50.705 - Finished parsing #1
11:43:50.705 - Started parsing #3
11:43:50.752 - Active count: 2, Core pool size: 2, Maximum pool size: 100, Pool size: 2
11:43:51.719 - Finished parsing #2
11:43:51.719 - Started parsing #4
11:43:51.766 - Active count: 2, Core pool size: 2, Maximum pool size: 100, Pool size: 2
11:43:52.780 - Active count: 2, Core pool size: 2, Maximum pool size: 100, Pool size: 2
11:43:53.794 - Active count: 2, Core pool size: 2, Maximum pool size: 100, Pool size: 2
11:43:54.795 - Active count: 2, Core pool size: 2, Maximum pool size: 100, Pool size: 2
11:43:54.795 - !!!! Setting core pool size to 1 !!!!
11:43:55.715 - Finished parsing #3
11:43:55.715 - Started parsing #5
11:43:55.809 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:43:56.729 - Finished parsing #4
11:43:56.729 - Started parsing #6
11:43:56.822 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:43:57.836 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:43:58.851 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:43:59.865 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:00.723 - Finished parsing #5
11:44:00.723 - Started parsing #7
11:44:00.879 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:01.739 - Finished parsing #6
11:44:01.739 - Started parsing #8
11:44:01.880 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:02.894 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:03.908 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:04.922 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:05.733 - Finished parsing #7
11:44:05.733 - Started parsing #9
11:44:05.936 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:06.749 - Finished parsing #8
11:44:06.749 - Started parsing #10
11:44:06.936 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:07.937 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:08.937 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:09.951 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:10.747 - Finished parsing #9
11:44:10.965 - Active count: 1, Core pool size: 1, Maximum pool size: 100, Pool size: 1
11:44:11.762 - Finished parsing #10

【问题讨论】:

标签: java multithreading threadpool threadpoolexecutor


【解决方案1】:

过去我创建了一个辅助ThreadPoolExecutor 来从核心执行程序窃取任务,而不是动态更改核心ThreadPoolExecutor 的大小(您发现这可能有点不稳定) - 这个允许我在不影响核心执行器的情况下终止辅助执行器。理想情况下,我们只能让两个执行程序共享一个工作队列,但这似乎引入了奇怪的并发错误,而且麻烦多于值得 - 最好将队列分开并使用 afterExecute 保留辅助执行者池已满。

public class AuxiliaryExecutor extends ThreadPoolExecutor {
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final BlockingQueue<Runnable> coreQueue;

    private void pollAndExecute() {
        try {
            this.execute(coreQueue.poll());
        } catch(NullPointerException e) {
            this.execute(new Runnable() {
                public void run() {
                    try {
                        this.execute(coreQueue.take());
                    } catch(InterruptedException e) {
                        return;
                    }
                }
            });
        }
    }

    public AuxiliaryExecutor(ThreadPoolExecutor coreExecutor, int poolSize) {
        super(poolSize, poolSize, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(poolSize));
        this.coreQueue = coreExecutor.getQueue();
        for(int i = 0; i < poolSize; i++) {
            pollAndExecute();
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        if(!shutdown.get()) {
            pollAndExecute();
        }
    }

    @Override
    public void shutdown() {
        shutdown.set(true);
    }
}

如果你想改变辅助执行器的大小,那么你可以关闭辅助执行器并用一个新的调整大小的执行器替换它。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-08-19
    • 2021-05-28
    • 1970-01-01
    • 1970-01-01
    • 2019-08-18
    • 2019-12-03
    相关资源
    最近更新 更多