【问题标题】:JAVA Keep Submitting tasks to thread pool and Shutdown threadpool when no more tasks are executing?当没有更多任务正在执行时,JAVA继续向线程池提交任务并关闭线程池?
【发布时间】:2016-06-24 16:52:55
【问题描述】:

我正在试验 JAVA 线程池,想知道是否有办法在没有更多线程提交时关闭线程池,并在一个线程完成执行时提交线程。

更清楚地说,我有一个Integer 类型的THREAD_LIMIT 变量,这是可以执行并行和递归算法的最大线程数,假设在调用自身之前检查线程的活动数。如果活动线程数小于线程限制,它将向线程池提交一个新线程,否则在同一线程上调用递归。

我面临的问题是跟踪活动线程并在没有提交新线程时关闭线程池。我想使用多线程从我的代码中获得最大性能。

我按照this 教程创建了自己的线程池并使用了

   public int getTaskQueueSize() {
    return taskQueue.size();
   }

ThreadPool 类中获取活动线程数。

在我正在使用的主类中

     void shutdownExecutor() {
    while (tp.getTaskQueueSize() != 0) {
          try {
            Thread.sleep(100);
        } catch (InterruptedException ex) {
            Logger.getLogger(HamiltonPaths.class.getName()).log(Level.SEVERE, null, ex);
        }
         //  System.out.println("Not STopping tp");
    }
    tp.shutdown();
    // System.out.println("Stopped tp");

}

在主类中关闭线程池。但过了一段时间它停止产生新线程。但是一个线程继续以递归方式完成工作。

更新 1:添加代码 所以我发现向线程池提交任务工作正常。但是我不小心添加了一个最近更改代码的错误,这阻止了我向线程池提交更多任务并从shutdownExecutor() 函数关闭线程池,因为tp.getTaskQueueSize() 正在返回队列的初始大小或任务没有被删除来自阙。

我正在使用以下逻辑来决定是提交新任务还是继续递归工作。

if ((this.tp.getTaskQueueSize() < threadLimit) && (!this.tp.isPoolShutDownInitiated())) {
                spawnNewThread(new PTask(cp, get));//Class that implements the Runnable and do the same thing as the function called in below statement.

            } else {
                PPath(cp, get);// call to the function
            }

BlockingQueueCustom.java

 package threadpool;

 abstract interface BlockingQueueCustom<E>
 {
   public abstract void put(E paramE)
     throws InterruptedException;

   public abstract E take()
     throws InterruptedException;

   public abstract int size();
 }

LinkedBlockingQueueCustom.java

package threadpool;

import java.util.LinkedList;
import java.util.List;

class LinkedBlockingQueueCustom<E>
    implements BlockingQueueCustom<E> {

private List<E> queue;
private int maxSize;

public LinkedBlockingQueueCustom(int maxSize) {
    this.maxSize = maxSize;
    this.queue = new LinkedList();
}

public synchronized void put(E item)
        throws InterruptedException {
    if (this.queue.size() == this.maxSize) {
        wait();
    }

    this.queue.add(item);
    notifyAll();
}

public synchronized E take()
        throws InterruptedException {
    if (this.queue.isEmpty()) {
        wait();
    }

    notifyAll();
    if (this.queue.isEmpty()) {
        return null;
    }
    return (E) this.queue.remove(0);
}

public synchronized int size() {
    return this.queue.size();
}
}

ThreadPool.java

package threadpool;

import java.util.logging.Level;
import java.util.logging.Logger;

public class ThreadPool {

private BlockingQueueCustom<Runnable> taskQueue;
int size = 0;
int taskExecuted = 0;
ThreadPoolsThread[] threadPoolsThread;

public int getTaskExecuted() {
    return this.taskExecuted;
}

public synchronized void dectaskExec() {
    this.taskExecuted -= 1;
}

public int getSize() {
    return this.size;
}

public BlockingQueueCustom<Runnable> getTaskQueue() {
    return this.taskQueue;
}

public int getTaskQueueSize() {
    return this.taskQueue.size();
}

private boolean poolShutDownInitiated = false;

public ThreadPool(int nThreads) {
    this.taskQueue = new LinkedBlockingQueueCustom(nThreads);
    this.size = nThreads;
    this.threadPoolsThread = new ThreadPoolsThread[nThreads + 1];

    for (int i = 1; i <= nThreads; i++) {
        this.threadPoolsThread[i] = new ThreadPoolsThread(this.taskQueue, this);
        this.threadPoolsThread[i].setName("" + i);

        this.threadPoolsThread[i].start();
    }
}

public synchronized void execute(Runnable task) {
    if (this.poolShutDownInitiated) {
        try {
            throw new Exception("ThreadPool has been shutDown, no further tasks can be added");
        } catch (Exception ex) {
            Logger.getLogger(ThreadPool.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
    this.taskExecuted += 1;

    try {
        this.taskQueue.put(task);
    } catch (InterruptedException ex) {
        Logger.getLogger(ThreadPool.class.getName()).log(Level.SEVERE, null, ex);
    }
}

public boolean isPoolShutDownInitiated() {
    return this.poolShutDownInitiated;
}

public synchronized void shutdown() {
    this.poolShutDownInitiated = true;
}
}

ThreadPoolsThread.java

package threadpool;

import java.util.logging.Level;
import java.util.logging.Logger;

class ThreadPoolsThread
    extends Thread {

private BlockingQueueCustom<Runnable> taskQueue;
private ThreadPool threadPool;

public ThreadPoolsThread(BlockingQueueCustom<Runnable> queue, ThreadPool threadPool) {
    this.taskQueue = queue;
    this.threadPool = threadPool;
}

public void run() {
    for (;;) {
        Runnable runnable = null;
        while ((!this.threadPool.isPoolShutDownInitiated()) && (this.taskQueue.size() == 0)) {
        }

        if ((this.threadPool.isPoolShutDownInitiated()) && (this.taskQueue.size() == 0)) {
            break;
        }

        try {
            runnable = (Runnable) this.taskQueue.take();
        } catch (InterruptedException ex) {
            Logger.getLogger(ThreadPoolsThread.class.getName()).log(Level.SEVERE, null, ex);
            break;
        }

        if (runnable == null) {
            break;
        }

        runnable.run();

        if ((this.threadPool.isPoolShutDownInitiated())
                && (this.taskQueue.size() == 0)) {
            interrupt();

            try {
                Thread.sleep(1L);
            } catch (InterruptedException ex) {
                Logger.getLogger(ThreadPoolsThread.class.getName()).log(Level.SEVERE, null, ex);
                break;
            }
        }
    }
}
}

【问题讨论】:

  • 您不会将线程提交到线程池。您提交任务以供池中的线程执行。
  • @Raedwald 感谢您纠正我的术语。

标签: java multithreading java-8 threadpool threadpoolexecutor


【解决方案1】:

似乎存在根本性的误解。您不会“向线程池提交新线程”,而是将 tasks 提交给执行器服务(可能实现为线程池)。执行器服务管理线程并决定是否启动、停止或重用它们。如果您将执行程序配置为具有最大线程数,它将使用不超过指定数量。如果您提交的任务多于可用线程,这些任务将被排队。因此,队列的大小不是线程数。

一般来说,您不应该实现自己的线程池,尤其是当您还没有完全理解它们的作用时。 Java 已经有十多年的内置实现了。您可以使用Executors.newFixedThreadPool(threadCount) 来获取使用指定线程数的线程池执行器。

如果你想强制任务不入队,而是在所有线程都忙时总是由调用者执行,实现起来并不难。显式创建ThreadPoolExecutor,将the constructor allowing to specify a custom queue and RejectedExecutionHandlerSynchronousQueueCallerRunsPolicy 一起使用:

ThreadPoolExecutor e=new ThreadPoolExecutor(threadCount, threadCount,
    1, TimeUnit.MINUTES, new SynchronousQueue<>(), Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.CallerRunsPolicy());

在 Java 8 之前,您必须明确队列类型:new SynchronousQueue&lt;Runnable&gt;()
这会将提交的任务传递给空闲线程(如果有),否则在提交线程中执行。请注意,当核心大小和最大大小相同时,如上例所示,超时值无关紧要。

如果您已提交所有任务,您可以简单地调用shutDown() 而无需额外检查。它仍然会在停止所有线程之前执行挂起的任务。

【讨论】:

  • 感谢您的回答。如何获得活动线程的数量,以便我可以实现提交新任务的逻辑??
  • 我想,我详细解释了你不需要知道这个数字,因为代码示例的执行者已经实现了这个逻辑,不需要任何额外的努力。您所需要的只是调用executesubmit 与一个新任务,如果有空闲线程,它将在另一个线程中运行,如果它们都忙,它会在调用者的线程中运行。查询活动线程的数量是可能的,但没有意义,因为当查询方法返回时,这个数字可能已经过时了。
  • 是的,我明白你想说什么。但我需要知道活动任务的确切数量,以便我的逻辑可以选择是继续递归工作还是提交新任务以利用最大并行度。
猜你喜欢
  • 2015-01-12
  • 2015-06-26
  • 2016-03-15
  • 2010-09-07
  • 2019-10-08
  • 1970-01-01
  • 1970-01-01
  • 2019-08-06
  • 1970-01-01
相关资源
最近更新 更多