【问题标题】:Partial retrieval of data from a batch of tasks从一批任务中提取部分数据
【发布时间】:2018-04-25 13:28:52
【问题描述】:

我正在使用 ExecutorService 提交一批任务。我正在这样做:

ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threads));
List<ListenableFuture<Whatever>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
  results.add(exec.submit(new MyTask(i)));
}

ListenableFuture<List<Whatever>> listListenableFuture = Futures.successfulAsList(futures);

try {
    List<Whatever> responses = listListenableFuture.get(2000, TimeUnit.MILLISECONDS);
    for (Whatever response : responses) {
      LOG.info("Yay!");
    }
} catch (TimeoutException e) {
  LOG.info("Timeout Exception");
} catch (Exception e) {
  // Nay! 
}

这里的问题是 - 如果其中一项任务花费的时间超过 2000 毫秒,则会抛出 TimeoutException,尽管某些任务可能已经完成,但我将在响应中一无所获。

所以我想检索已完成的任务的响应(无论是部分还是完整),直到超时(2000 毫秒)。例如:

(相对于批处理调用的 START_TIME 的时间)
任务 1:1000 毫秒
任务 2:3000 毫秒
任务 3:1800 毫秒


输出:
超时异常


期望的输出:
耶! 耶!

我想到的一个解决方案是单独获取期货并将它们的超时设置为MAX(0, TIME_OUT - TIME_NOW - START_TIME)。这可能有效,但对我来说似乎不是一个干净的解决方案。

【问题讨论】:

    标签: java multithreading concurrency executorservice callable


    【解决方案1】:

    您可以使用处理超时的可调用装饰。

    假设这是原始的可调用对象:

    class OriginalCallable implements Callable<String> {
        @Override
        public String call() throws Exception {
            return "";
        }
    }
    

    你可以用这个原始的可调用对象和执行器构造一个装饰可调用对象:

    class DecorateCallable implements Callable<String> {
        ExecutorService executorService;
        OriginalCallable callable;
    
        public DecorateCallable(ExecutorService executorService, OriginalCallable callable) {
            this.executorService = executorService;
            this.callable = callable;
        }
    
        @Override
        public String call() throws Exception {
            Future<String> future = executorService.submit(callable);
            try {
                return future.get(2000, TimeUnit.SECONDS);
            } catch (TimeoutException | InterruptedException e) {
    
            }
            return null;
        }
    }
    

    如果你决定使用这个,你需要双倍的池大小:

    Executors.newFixedThreadPool(threads * 2);
    

    并在将它们放入最终结果集之前添加一些条件,例如if(future.get() != null)

    【讨论】:

      【解决方案2】:

      如果你使用 Futures.getChecked,超时异常会被吞没,future 会返回 null。看看下面的代码,其中一个任务抛出了TimeoutException,对应的future返回null。

      import java.io.IOException;
      
      import com.google.common.util.concurrent.*;
      import java.time.*;
      import java.util.*;
      import java.util.concurrent.*;
      import java.util.stream.*;
      
      public class App 
      {
          public static void main( String[] args ) throws InterruptedException, ExecutionException
          {
              ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
              ListenableFuture<String> future1 = 
                  listeningExecutorService.submit(() -> {
                      throw new TimeoutException("Timeout exception");
                  });
              ListenableFuture<String> future2 = 
                  listeningExecutorService.submit(() -> "Hello World");
              ListenableFuture<List<String>> combined = Futures.successfulAsList(future1, future2);
              try {
                  String greeting = Futures.getChecked(combined, IOException.class, 2000l, TimeUnit.MILLISECONDS).stream().collect(Collectors.joining(" "));
                  System.out.println(greeting);
              } catch (IOException e) {
                  System.out.println("Exception: " + e.getMessage());
              } finally {
                  listeningExecutorService.shutdown();
              }
          }
      }
      

      【讨论】:

        猜你喜欢
        • 2019-11-02
        • 2013-03-29
        • 2013-05-18
        • 1970-01-01
        • 2017-07-06
        • 2011-07-08
        • 2014-12-11
        • 1970-01-01
        • 2017-03-03
        相关资源
        最近更新 更多