【发布时间】:2016-06-24 16:52:55
【问题描述】:
我正在试验 JAVA 线程池,想知道是否有办法在没有更多线程提交时关闭线程池,并在一个线程完成执行时提交线程。
更清楚地说,我有一个Integer 类型的THREAD_LIMIT 变量,这是可以执行并行和递归算法的最大线程数,假设在调用自身之前检查线程的活动数。如果活动线程数小于线程限制,它将向线程池提交一个新线程,否则在同一线程上调用递归。
我面临的问题是跟踪活动线程并在没有提交新线程时关闭线程池。我想使用多线程从我的代码中获得最大性能。
我按照this 教程创建了自己的线程池并使用了
public int getTaskQueueSize() {
return taskQueue.size();
}
在ThreadPool 类中获取活动线程数。
在我正在使用的主类中
void shutdownExecutor() {
while (tp.getTaskQueueSize() != 0) {
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
Logger.getLogger(HamiltonPaths.class.getName()).log(Level.SEVERE, null, ex);
}
// System.out.println("Not STopping tp");
}
tp.shutdown();
// System.out.println("Stopped tp");
}
在主类中关闭线程池。但过了一段时间它停止产生新线程。但是一个线程继续以递归方式完成工作。
更新 1:添加代码
所以我发现向线程池提交任务工作正常。但是我不小心添加了一个最近更改代码的错误,这阻止了我向线程池提交更多任务并从shutdownExecutor() 函数关闭线程池,因为tp.getTaskQueueSize() 正在返回队列的初始大小或任务没有被删除来自阙。
我正在使用以下逻辑来决定是提交新任务还是继续递归工作。
if ((this.tp.getTaskQueueSize() < threadLimit) && (!this.tp.isPoolShutDownInitiated())) {
spawnNewThread(new PTask(cp, get));//Class that implements the Runnable and do the same thing as the function called in below statement.
} else {
PPath(cp, get);// call to the function
}
BlockingQueueCustom.java
package threadpool;
abstract interface BlockingQueueCustom<E>
{
public abstract void put(E paramE)
throws InterruptedException;
public abstract E take()
throws InterruptedException;
public abstract int size();
}
LinkedBlockingQueueCustom.java
package threadpool;
import java.util.LinkedList;
import java.util.List;
class LinkedBlockingQueueCustom<E>
implements BlockingQueueCustom<E> {
private List<E> queue;
private int maxSize;
public LinkedBlockingQueueCustom(int maxSize) {
this.maxSize = maxSize;
this.queue = new LinkedList();
}
public synchronized void put(E item)
throws InterruptedException {
if (this.queue.size() == this.maxSize) {
wait();
}
this.queue.add(item);
notifyAll();
}
public synchronized E take()
throws InterruptedException {
if (this.queue.isEmpty()) {
wait();
}
notifyAll();
if (this.queue.isEmpty()) {
return null;
}
return (E) this.queue.remove(0);
}
public synchronized int size() {
return this.queue.size();
}
}
ThreadPool.java
package threadpool;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ThreadPool {
private BlockingQueueCustom<Runnable> taskQueue;
int size = 0;
int taskExecuted = 0;
ThreadPoolsThread[] threadPoolsThread;
public int getTaskExecuted() {
return this.taskExecuted;
}
public synchronized void dectaskExec() {
this.taskExecuted -= 1;
}
public int getSize() {
return this.size;
}
public BlockingQueueCustom<Runnable> getTaskQueue() {
return this.taskQueue;
}
public int getTaskQueueSize() {
return this.taskQueue.size();
}
private boolean poolShutDownInitiated = false;
public ThreadPool(int nThreads) {
this.taskQueue = new LinkedBlockingQueueCustom(nThreads);
this.size = nThreads;
this.threadPoolsThread = new ThreadPoolsThread[nThreads + 1];
for (int i = 1; i <= nThreads; i++) {
this.threadPoolsThread[i] = new ThreadPoolsThread(this.taskQueue, this);
this.threadPoolsThread[i].setName("" + i);
this.threadPoolsThread[i].start();
}
}
public synchronized void execute(Runnable task) {
if (this.poolShutDownInitiated) {
try {
throw new Exception("ThreadPool has been shutDown, no further tasks can be added");
} catch (Exception ex) {
Logger.getLogger(ThreadPool.class.getName()).log(Level.SEVERE, null, ex);
}
}
this.taskExecuted += 1;
try {
this.taskQueue.put(task);
} catch (InterruptedException ex) {
Logger.getLogger(ThreadPool.class.getName()).log(Level.SEVERE, null, ex);
}
}
public boolean isPoolShutDownInitiated() {
return this.poolShutDownInitiated;
}
public synchronized void shutdown() {
this.poolShutDownInitiated = true;
}
}
ThreadPoolsThread.java
package threadpool;
import java.util.logging.Level;
import java.util.logging.Logger;
class ThreadPoolsThread
extends Thread {
private BlockingQueueCustom<Runnable> taskQueue;
private ThreadPool threadPool;
public ThreadPoolsThread(BlockingQueueCustom<Runnable> queue, ThreadPool threadPool) {
this.taskQueue = queue;
this.threadPool = threadPool;
}
public void run() {
for (;;) {
Runnable runnable = null;
while ((!this.threadPool.isPoolShutDownInitiated()) && (this.taskQueue.size() == 0)) {
}
if ((this.threadPool.isPoolShutDownInitiated()) && (this.taskQueue.size() == 0)) {
break;
}
try {
runnable = (Runnable) this.taskQueue.take();
} catch (InterruptedException ex) {
Logger.getLogger(ThreadPoolsThread.class.getName()).log(Level.SEVERE, null, ex);
break;
}
if (runnable == null) {
break;
}
runnable.run();
if ((this.threadPool.isPoolShutDownInitiated())
&& (this.taskQueue.size() == 0)) {
interrupt();
try {
Thread.sleep(1L);
} catch (InterruptedException ex) {
Logger.getLogger(ThreadPoolsThread.class.getName()).log(Level.SEVERE, null, ex);
break;
}
}
}
}
}
【问题讨论】:
-
您不会将线程提交到线程池。您提交任务以供池中的线程执行。
-
@Raedwald 感谢您纠正我的术语。
标签: java multithreading java-8 threadpool threadpoolexecutor