【问题标题】:Using a shared ExecutorService as a task queue, how do I know when a job is complete?使用共享的 ExecutorService 作为任务队列,我如何知道作业何时完成?
【发布时间】:2012-08-09 12:24:42
【问题描述】:

我有一个处理较大作业的 JobService。作业被动态细分为多个任务,任务也可能产生子任务等,因此无法预测一个作业的任务总数。每个任务排队通过ExecutorService.submit(...) 运行。问题是我似乎必须为每个作业创建一个单独的 ExecutorService,因为判断“作业队列”何时完成的唯一方法是使用ExecutorService.awaitTermination(...)。不过这似乎效率低下,因为在作业和它们的 ExecutorService 之间有I can't share a single threadpool

我正在寻找一些替代方案,我正在考虑为每项工作使用 AtomicInteger。提交新任务时递增,任务完成时递减。但是我必须轮询它什么时候为零,这看起来很混乱,还有一些异常处理混乱。

看来一定有更好的解决方案?

【问题讨论】:

  • 你检查过CompletionService吗?您可以将 submit 与将其与正在完成的 Job 关联的值一起使用。
  • 查看the javadoc of ExecutorCompletionService 以获取示例代码。
  • 我确实看过CompletionService,但这似乎不是我想要的,因为我不关心处理个别结果,我只想知道它们什么时候完成,所以 poll()/take() 一个一个看起来很乱,我真的只想知道completionQueue什么时候是空的。也许我应该考虑扩展它以达到我的目的。
  • 你看过ExecutorSerice的invokeAll方法吗?不确定这是否能满足您的需求
  • InvokeAll 是我最初使用的,但问题是我提交给 invokeAll 的一些任务会反过来提交更多任务,我也需要知道它们何时完成。

标签: java concurrency threadpool


【解决方案1】:

Submit 返回一个可用于等待任务完成的 Future 对象。您可以跟踪这些并添加一个递归阻塞直到所有子任务完成的方法。这样您就可以在任何需要的地方重用执行器。

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

public class JobExecutor {
    ExecutorService executorService = Executors.newFixedThreadPool(1);

    private class Task implements Runnable {
        private final String name;
        private final Task[] subtasks;
        private final ExecutorService executorService;
        private volatile boolean started = false;
        private Future<?> taskFuture;

        // Separate list from subtasks because this is what you'll probably
        // actually use as you may not be passing subtasks as constructor args
        private final List<Task> subtasksToWaitOn = new ArrayList<Task>();

        public Task(String name, ExecutorService executorService,
                Task... subtasks) {
            this.name = name;
            this.executorService = executorService;
            this.subtasks = subtasks;
        }

        public synchronized void start() {
            if (!started) {
                started = true;
                taskFuture = executorService.submit(this);
            }
        }

        public synchronized void blockTillDone() {
            if (started) {
                try {
                    taskFuture.get();
                } catch (InterruptedException e) {
                    // TODO Handle
                } catch (ExecutionException e) {
                    // TODO Handle
                }
                for (Task subtaskToWaitOn : subtasksToWaitOn) {
                    subtaskToWaitOn.blockTillDone();
                }
            } else {
                // TODO throw exception
            }
        }

        @Override
        public void run() {
            for (Task subtask : subtasks) {
                subtask.start();
                subtasksToWaitOn.add(subtask);
            }
            System.out.println("My name is: " + name);
        }
    }

    void testSubmit() {
        Task subsubTask1 = new Task("Subsubtask1", executorService);
        Task subtask1 = new Task("Subtask1", executorService, subsubTask1);
        Task subtask2 = new Task("Subtask2", executorService);
        Task subtask3 = new Task("Subtask3", executorService);
        Task job = new Task("Job", executorService, subtask1, subtask2,
                subtask3);
        job.start();
        job.blockTillDone();
        System.out.println("Job done!");
    }

    public static void main(String[] args) {
        new JobExecutor().testSubmit();
    }
}

打印出来:

My name is: Job
My name is: Subtask1
My name is: Subtask2
My name is: Subtask3
My name is: Subsubtask1
Job done!

【讨论】:

  • 谢谢!这是我前进的方向,问题是任务是动态添加的,而不是预先添加的,所以在 blockTillDone() 中,我将在 subtaskToWaitOn 循环中获得 ConcurrentModificationExceptions。
  • 这个解决方案考虑到了这一点。阻塞直到完成首先阻塞,直到父任务的运行方法完成(taskFuture.get())。此时,所有子任务都已添加。我实际上是在 run 方法中将所有子任务添加到 subtasksToWaitOn 中,以模拟子任务的动态添加。
  • 你是对的,我有一些额外的垃圾导致了这个问题。我现在已经做得很好了。那是你的帮助!对于那里的谷歌人,我想我最终还是想使用 Fork-Join 池,但我会等到我们的项目到达 Java7。
【解决方案2】:

如果您使用的是 java7(或带有后向库 http://www.cs.washington.edu/homes/djg/teachingMaterials/grossmanSPAC_forkJoinFramework.html 的 java6),您可能需要考虑使用 Fork-Join 池来处理此类事情:

class MainTask extends RecursiveTask<Long> {

    @Override
    protected Long compute() {
        SubTask subtask0 = new SubTask(0L);
        SubTask subtask1 = new SubTask(1L);
        SubTask subtask2 = new SubTask(2L);
        SubTask subtask3 = new SubTask(3L);
        SubTask subtask4 = new SubTask(4L);
        SubTask subtask5 = new SubTask(5L);

        subtask1.fork();
        subtask2.fork();
        subtask3.fork();
        subtask4.fork();
        subtask5.fork();

        return subtask0.compute() +
                subtask1.join() +
                subtask2.join() +
                subtask3.join() +
                subtask4.join() +
                subtask5.join();
    }

}

class SubTask extends RecursiveTask<Long> {
    private Long rawResult = null;

    private Long expected = null;

    public SubTask(long expected) {
        this.expected = expected;
    }

    @Override
    protected Long compute() {
        return expected;
    }
}

public static void main( String[] args )
{
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    Long result = forkJoinPool.invoke(new MainTask());
    System.out.println(result);
}

显然,这具有硬编码的子任务,但没有理由不能将参数传递给主任务,并使用它来生成子任务。子任务本身不必都是同一类型,但它们都应该扩展 RecursiveTask。实际上,如果一个任务生成子任务(如上面的 MainTask),至少其中一个子任务应该直接在其上调用“计算”​​(而不是分叉和连接),以便当前线程可以执行一些计算,并让其他线程完成其余的工作。

【讨论】:

  • ps:这种事情正是 ForkJoinPool 的目的。
  • 我现在坚持使用 java6,但这看起来确实是最优雅/正确的解决方案,我会更多地使用 jsr166 的东西。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-05-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-06-16
相关资源
最近更新 更多