【发布时间】:2015-10-05 17:09:12
【问题描述】:
简介:
我开发了一个类,它可以接受Tasks 的数量,并行执行它们并等待特定时间的结果。如果某些任务在给定的超时时间内未能完成,它将中断整个执行并仅返回可用的结果。
问题:
一开始一切正常,但一段时间后 CPU 使用率增加到 100%,应用程序显然无法响应。
您能否尝试帮助我找到问题或提出更好的解决方案,以实现相同的目标?
代码:
TaskService.java
public abstract class TaskService {
private static final org.slf4j.Logger InfoLogger = LoggerFactory.getLogger("InfoLogger");
private static final org.slf4j.Logger ErrorLogger = LoggerFactory.getLogger("ErrorLogger");
@Autowired
private TimeLimiter timeLimiter;
public List<TaskResult> execute(TaskType taskType, TimeUnit timeUnit, long timeout, final Task... tasks){
final List<TaskResult> taskResultsStorage = new ArrayList<>();
try {
timeLimiter.callWithTimeout(new Callable<List<TaskResult>>() {
@Override
public List<TaskResult> call() throws Exception {
return run(taskResultsStorage, tasks);
}
}, timeout, timeUnit, true);
} catch (UncheckedTimeoutException e) {
String errorMsg = String.format("Time out of [%s] [%s] has been exceeded for task type:[%s]", timeout, timeUnit.name(), taskType.name());
ErrorLogger.error(errorMsg, e);
} catch (Exception e) {
String errorMsg = String.format("Unexpected error for task type:[%s]", taskType.name());
ErrorLogger.error(errorMsg, e);
}
return taskResultsStorage;
}
protected abstract List<TaskResult> run(List<TaskResult> taskResults,Task... tasks) throws ExecutionException, InterruptedException;
}
AsynchronousTaskService.java
public class AsynchronousTaskService extends TaskService {
private CompletionService<TaskResult> completionService;
public AsynchronousTaskService(ThreadExecutorFactory threadExecutorFactory){
this.completionService = new ExecutorCompletionService<TaskResult>(threadExecutorFactory.getExecutor());
}
@Override
protected List<TaskResult> run(List<TaskResult> resultStorage, Task... tasks) throws ExecutionException, InterruptedException {
List<Future<TaskResult>> futureResults = executeTask(tasks);
awaitForResults(futureResults, resultStorage);
return resultStorage;
}
private List<Future<TaskResult>> executeTask(Task... tasks){
List<Future<TaskResult>> futureTaskResults = new ArrayList<>();
if(tasks!=null) {
for (Task task : tasks) {
if (task != null) {
futureTaskResults.add(completionService.submit(task));
}
}
}
return futureTaskResults;
}
private void awaitForResults(List<Future<TaskResult>> futureResults, List<TaskResult> resultStorage) throws ExecutionException, InterruptedException {
int submittedTasks = futureResults.size();
int taskCompleted = 0;
if(futureResults != null){
while(taskCompleted < submittedTasks){
Iterator<Future<TaskResult>> it = futureResults.iterator();
while(it.hasNext()){
Future<TaskResult> processingTask = it.next();
if(processingTask.isDone()){
TaskResult taskResult = processingTask.get();
resultStorage.add(taskResult);
it.remove();
taskCompleted++;
}
}
}
}
}
}
ThreadExecutorFactory.java
@Component
public class ThreadExecutorFactory {
private int THREAD_LIMIT = 100;
private final Executor executor;
public ThreadExecutorFactory() {
executor = Executors.newFixedThreadPool(THREAD_LIMIT,
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
}
public Executor getExecutor() {
return executor;
}
}
Task.java
public abstract class Task<T extends TaskResult> implements Callable<T> {
}
TaskResult.java
public abstract class TaskResult {
}
【问题讨论】:
-
您是否尝试降低 THREAD_LIMIT?例如。到您系统的核心数量(例如使用
Runtime.getRuntime().availableProcessors())? -
线程数多于处理器(或超线程 CPU 的 2* 处理器)的主题将显着降低性能。这可能不是你的问题,但 100 个线程是个坏主意
-
感谢你们的 cmets 伙计们,我会尽量减少池中的线程数。