【发布时间】:2010-09-12 15:00:06
【问题描述】:
我正在寻找最简单、最直接的方法来实现以下内容:
- 主程序实例化worker 线程来执行任务。
- 一次只能运行
n任务。 - 当达到
n时,不再有工人 开始直到计数 正在运行的线程回落到n以下。
【问题讨论】:
标签: java multithreading concurrency
我正在寻找最简单、最直接的方法来实现以下内容:
n 任务。n 时,不再有工人
开始直到计数
正在运行的线程回落到n以下。【问题讨论】:
标签: java multithreading concurrency
正如这里的其他人所提到的,最好的办法是使用 Executors 类创建一个线程池:
但是,如果您想自己动手,这段代码应该让您知道如何继续。基本上,只需将每个新线程添加到线程组,并确保组中的活动线程永远不会超过 N:
Task[] tasks = getTasks(); // array of tasks to complete
ThreadGroup group = new ThreadGroup();
int i=0;
while( i<tasks.length || group.activeCount()>0 ) {
if( group.activeCount()<N && i<tasks.length ) {
new TaskThread(group, tasks[i]).start();
i++;
} else {
Thread.sleep(100);
}
}
【讨论】:
使用Executor框架;即newFixedThreadPool(N)
【讨论】:
Executors.newFixedThreadPool(int)
Executor executor = Executors.newFixedThreadPool(n);
Runnable runnable = new Runnable() {
public void run() {
// do your thing here
}
}
executor.execute(runnable);
【讨论】:
我认为Executors.newFixedThreadPool 符合您的要求。有许多不同的方法可以使用生成的 ExecutorService,这取决于您是否希望将结果返回到主线程,或者任务是否完全独立,以及您是否有一组任务要预先执行,或者任务是否排队以响应某些事件。
Collection<YourTask> tasks = new ArrayList<YourTask>();
YourTask yt1 = new YourTask();
...
tasks.add(yt1);
...
ExecutorService exec = Executors.newFixedThreadPool(5);
List<Future<YourResultType>> results = exec.invokeAll(tasks);
或者,如果您有一个新的异步任务要执行以响应某个事件,您可能只想使用 ExecutorService 的简单 execute(Runnable) 方法。
【讨论】:
如果你的任务队列不是无限的并且任务可以在更短的时间间隔内完成,你可以使用Executors.newFixedThreadPool(n);正如专家建议的那样。
此解决方案的唯一缺点是任务队列大小无限制。你无法控制它。任务队列中的大量堆积会降低应用程序的性能,并且在某些情况下可能会导致内存不足。
如果您想使用ExecutorService 并启用 work stealing 机制,其中空闲工作线程通过窃取任务队列中的任务来分担繁忙工作线程的工作负载。它将返回 ForkJoinPool 类型的 Executor Service。
public static
ExecutorService newWorkStealingPool(int parallelism)创建一个线程池来维护足够的线程来支持给定的并行级别,并且可以使用多个队列来减少争用。并行度级别对应于主动参与或可参与任务处理的最大线程数。实际线程数可能会动态增长和收缩。工作窃取池不保证提交任务的执行顺序。
我更喜欢ThreadPoolExecutor,因为 API 可以灵活地控制许多参数,从而控制流任务的执行。
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
在你的情况下,同时设置corePoolSize and maximumPoolSize as N。在这里您可以控制任务队列大小,定义自己的自定义线程工厂和拒绝处理程序策略。
查看相关的 SE 问题以动态控制池大小:
【讨论】:
如果你想自己动手:
private static final int MAX_WORKERS = n;
private List<Worker> workers = new ArrayList<Worker>(MAX_WORKERS);
private boolean roomLeft() {
synchronized (workers) {
return (workers.size() < MAX_WORKERS);
}
}
private void addWorker() {
synchronized (workers) {
workers.add(new Worker(this));
}
}
public void removeWorker(Worker worker) {
synchronized (workers) {
workers.remove(worker);
}
}
public Example() {
while (true) {
if (roomLeft()) {
addWorker();
}
}
}
Worker 是你的扩展 Thread 的类。每个工作人员在完成工作后都会调用这个类的 removeWorker 方法,将自己作为参数传入。
话虽如此,Executor 框架看起来好多了。
编辑:有人愿意解释为什么这会如此糟糕,而不是仅仅降低它吗?
【讨论】:
/* Get an executor service that will run a maximum of 5 threads at a time: */
ExecutorService exec = Executors.newFixedThreadPool(5);
/* For all the 100 tasks to be done altogether... */
for (int i = 0; i < 100; i++) {
/* ...execute the task to run concurrently as a runnable: */
exec.execute(new Runnable() {
public void run() {
/* do the work to be done in its own thread */
System.out.println("Running in: " + Thread.currentThread());
}
});
}
/* Tell the executor that after these 100 steps above, we will be done: */
exec.shutdown();
try {
/* The tasks are now running concurrently. We wait until all work is done,
* with a timeout of 50 seconds: */
boolean b = exec.awaitTermination(50, TimeUnit.SECONDS);
/* If the execution timed out, false is returned: */
System.out.println("All done: " + b);
} catch (InterruptedException e) { e.printStackTrace(); }
【讨论】: