【问题标题】:How to wait for list of `Future`s created using different `ExecutorServices`如何等待使用不同的`ExecutorService`创建`Future`列表
【发布时间】:2012-10-22 10:20:37
【问题描述】:

好的,所以我知道这里的第一个答案/评论将是“使用一个 ExecutorService 并使用 invokeAll”。但是,我们有充分的理由(我不会对此感到厌烦)将线程池分开。

所以我有一个线程池列表 (ExecutorServices),我需要做的是使用 submit 在每个线程池上调用不同的 Callable(那里没问题)。现在我有了这个Future 实例的集合,每个实例都在单独的ExecutorService 上创建,我想等待它们全部完成(并且能够提供一个超时,在该超时时取消任何未完成的操作)。

是否存在可以执行此操作的现有类(包装Future 实例的列表并允许等待所有操作完成)?如果没有,将不胜感激有关有效机制的建议。

正在考虑调用 get 并为每个调用设置超时,但必须计算每次调用所经过的总时间。

我看到这篇帖子 Wait Until Any of Future is Done 但这扩展了 Future 而不是包装它们的列表。

【问题讨论】:

  • 对于每个列表,当最后一个完成时 - 正如他们所说的那样
  • 更新帖子以明确超时是问题所在,我希望在这方面不要重新发明轮子。
  • 好吧,如果您阅读那些漂亮的 lil' java 文档,您会看到您可以 AwaitTermination 并设置超时,然后当您提交任务时,您可以将其发送关闭,如果任务未及时完成超时 wabamzorz 后关机!!!!
  • 他在哪里说执行器将被关闭?
  • 嗯。如果您可以将每个单独的服务包装在MoreExecutors.listeningDecorator(service) 中,则可以将它们全部转换为ListeningExecutorServices。然后你可以做Futures.allAsList 来创造一个“加入”的未来。

标签: java concurrency executorservice future


【解决方案1】:

根据 Louis 的评论,我正在寻找的是 Futures.successfulAsList

这使我可以等待所有操作都完成,然后检查是否有任何失败的期货。

番石榴规则!

【讨论】:

    【解决方案2】:

    我不认为 JDK 提供了一个直接的 API 可以让你做到这一点。但是,我认为创建一个执行此操作的简单方法同样简单。您可能想看看 AbstractExecutorService.invokeAll() 的实现,以了解可以做到这一点。

    基本上,您会在每个未来调用future.get(),将等待时间减少每次等待结果所需的时间,并在从方法返回之前取消所有未完成的未来。

    【讨论】:

    • +1 同意。我已经想到了这一点,并将其作为明显的机制包含在我的帖子中。我希望有一个更好的/已经实现的机制并且其他人知道。
    • 这可能是你能得到的最好的实现。 JDK 创建者将其作为独立(静态)方法提供是很简单的。另一种实现可能涉及使用完成服务并按照任务完成的顺序等待。然而,代码变得更加复杂,几乎没有额外的好处。
    【解决方案3】:

    这可能需要一些清理,但它应该可以解决您的问题。 (时间和空间省略了一些封装):

    public static <T> LatchWithWrappedCallables<T> wrapCallables(Collection<Callable<T>> callablesToWrap)
    {
        CountDownLatch latch = new CountDownLatch(callablesToWrap.size());
        List<Callable<T>> wrapped = new ArrayList<Callable<T>>(callablesToWrap.size());
        for (Callable<T> currCallable : callablesToWrap)
        {
            wrapped.add(new CallableCountdownWrapper<T>(currCallable, latch));
        }
    
        LatchWithWrappedCallables<T> returnVal = new LatchWithWrappedCallables<T>();
        returnVal.latch = latch;
        returnVal.wrappedCallables = wrapped;
        return returnVal;
    }
    
    public static class LatchWithWrappedCallables<T>
    {
        public CountDownLatch latch;
        public Collection<Callable<T>> wrappedCallables;
    }
    
    public static class CallableCountdownWrapper<T> implements Callable<T>
    {
        private final Callable<T> wrapped;
    
        private final CountDownLatch latch;
    
        public CallableCountdownWrapper(Callable<T> wrapped, CountDownLatch latch)
        {
            this.wrapped = wrapped;
            this.latch = latch;
        }
    
        @Override
        public T call() throws Exception
        {
            try
            {
                return wrapped.call();
            }
            finally
            {
                latch.countDown();
            }
        }
    }
    

    那么你的代码会这样称呼它:

    Collection<Callable<String>> callablesToWrap = [Your callables that you need to wait for here];
    LatchWithWrappedCallables<String> latchAndCallables = wrapCallables(callablesToWrap);
    
    [Submit the wrapped callables to the executors here]
    
    if(latchAndCallables.latch.await(timeToWaitInSec, TimeUnit.SECONDS))
    {
        [Handling for timeout here]
    }
    

    【讨论】:

    • 再想一想,我认为我可能会将 await 直接滚动到 LatchWithWrappedCallables 类中,以使其更容易。 (是的,“LatchWithWrappedCallables”是个糟糕的名字!)
    • 明天我会对此进行更深入的思考,看看它是否符合我的需要。谢谢。
    • 不错。我也需要看看 ListenableFuture 的事情。
    • 原来Futures.successfulAsList 就是我要找的东西
    【解决方案4】:

    也许我真的没有明白。但是,对我来说,它仍然听起来很简单

    public <V> List<V> get(List<Future<V>> futures, long timeout, TimeUnit unit)
              throws InterruptedException, ExecutionException, TimeoutException {
        List<V> result = new ArrayList<V>();
        long end = System.nanoTime() + unit.toNanos(timeout);
        for (Future<V> f: futures) {
            result.add(f.get(end - System.nanoTime(), TimeUnit.NANOSECONDS));
        }
        return result;
    }
    

    我错了吗?

    我认为您链接的问题要复杂得多,因为他们只想等待最快,当然不知道哪个会最快。

    【讨论】:

    • +1 这个问题是get 可以抛出异常。完成后将其更改为返回Futures 会有所帮助。此外,它不处理关机。这是显而易见的实现,我想看看是否有一些更干净/已经实现的测试。
    猜你喜欢
    • 2018-09-22
    • 1970-01-01
    • 2013-04-21
    • 1970-01-01
    • 2015-02-14
    • 2011-06-24
    • 1970-01-01
    • 2018-09-18
    相关资源
    最近更新 更多