【问题标题】:Get result from FutureTask after canceling it取消 FutureTask 后获取结果
【发布时间】:2014-07-29 11:44:28
【问题描述】:

考虑在Callable 实例中进行长时间运行的计算。

并且考虑到这个计算的结果可能有一些取决于计算时间的精度,即:如果任务将被取消,那么它应该在取消之前返回到目前为止计算的内容(例如,我们有一个无理数传送器计算)。

最好使用标准的 java 并发工具来实现这个范例,例如

Callable<ValuableResult> task = new Callable<>() { ... };
Future<ValuableResult> future = Executors.newSingleThreadExecutor().submit(task);
try {
    return future.get(timeout, TimeUnit.SECONDS);
} catch (TimeoutException te) {
    future.cancel(true);
    // HERE! Get what was computed so far
}

看来,如果不完全重新实现FutureThreadPoolExecutor 接口,这个问题就无法解决。 Java 1.7 中是否有任何方便的现有工具?

【问题讨论】:

    标签: java concurrency futuretask


    【解决方案1】:

    不是通过 Future 的 API 取消它,而是通过你自己的机制告诉它完成(例如你传递给构造函数的 long,它告诉它在正常返回之前运行多长时间;或者一个 @ 987654322@你设置为真)。

    请记住,一旦任务真正开始,cancel (true) 不会神奇地停止它。然后它所做的就是中断线程。有一些方法可以检查此标志并抛出 InterruptedException,否则您将不得不手动检查 isInterrupted 标志。那么,既然您无论如何都需要编写这种协作机制,为什么不让它更适合您的需求呢?

    【讨论】:

    • 在这种情况下,使用简单的Thread api 和一些CountDownLatchwait 和超时方法来实现所有这些事情会更容易。
    • 取决于你想要它做什么。 Callable 可以提交给任何类型的 Executor,如果你想要线程池,这很有用;最好不要为每个任务启动一段时间的新线程。
    【解决方案2】:

    嗯,在我看来,在这种情况下最简单的方法是准备一些 final ResultWrapper 对象,它将在这个 Callable 实例中传递:

    final ValuableResultWrapper wrapper = new ValuableResultWrapper();
    final CountDownLatch latch = new CountDownLatch(1);
    
    Callable<ValuableResultWrapper> task = new Callable<>() { 
       ... 
       wrapper.setValue(...); // here we set what we have computed so far
       latch.countDown();
       return wrapper;
       ...  
    };
    Future<ValuableResultWrapper> future = Executors.newSingleThreadExecutor().submit(task);
    try {
        return future.get(timeout, TimeUnit.SECONDS);
    } catch (TimeoutException te) {
        future.cancel(true);
        // HERE! Get what was computed so far
        latch.await();
        return wrapper;
    }
    

    UPD:在这样的实现中(变得复杂),我们必须引入某种闩锁(在我的示例中为CountDownLatch),以确保该任务将在我们完成之前完成@ 987654326@

    【讨论】:

      【解决方案3】:

      CompletionSerivceFutureTask 更强大,在许多情况下它更合适。我从中得到一些想法来解决问题。此外,它的子类 publicExecutorCompletionService 比 FutureTask 简单,只包含几行代码。这很容易阅读。所以我修改了类以获得部分计算的结果。对我来说是一个令人满意的解决方案,毕竟它看起来简单明了。

      演示代码:

      CompletionService<List<DeviceInfo>> completionService =
                      new MyCompletionService<>(Executors.newCachedThreadPool());   
              Future task = completionService.submit(detector);
          try {
              LogHelper.i(TAG, "result 111: " );
              Future<List<DeviceInfo>> result = completionService.take();
              LogHelper.i(TAG, "result: " + result.get());
          } catch (InterruptedException e) {
              e.printStackTrace();
          } catch (ExecutionException e) {
              e.printStackTrace();
          }
      

      这是课程代码:

      import java.util.concurrent.AbstractExecutorService;
      import java.util.concurrent.BlockingQueue;
      import java.util.concurrent.Callable;
      import java.util.concurrent.CancellationException;
      import java.util.concurrent.CompletionService;
      import java.util.concurrent.ExecutionException;
      import java.util.concurrent.Executor;
      import java.util.concurrent.Future;
      import java.util.concurrent.FutureTask;
      import java.util.concurrent.LinkedBlockingQueue;
      import java.util.concurrent.RunnableFuture;
      import java.util.concurrent.TimeUnit;
      
      /**
      *  This is a CompletionService like java.util.ExecutorCompletionService, but we can get partly computed result
       *  from our FutureTask which returned from submit, even we cancel or interrupt it.
       *  Besides, CompletionService can ensure that the FutureTask is done when we get from take or poll method.
       */
      public class MyCompletionService<V> implements CompletionService<V> {
          private final Executor executor;
          private final AbstractExecutorService aes;
          private final BlockingQueue<Future<V>> completionQueue;
      
          /**
           * FutureTask extension to enqueue upon completion.
           */
          private static class QueueingFuture<V> extends FutureTask<Void> {
              QueueingFuture(RunnableFuture<V> task,
                             BlockingQueue<Future<V>> completionQueue) {
                  super(task, null);
                  this.task = task;
                  this.completionQueue = completionQueue;
              }
              private final Future<V> task;
              private final BlockingQueue<Future<V>> completionQueue;
              protected void done() { completionQueue.add(task); }
          }
      
          private static class DoneFutureTask<V> extends FutureTask<V> {
              private Object outcome;
      
              DoneFutureTask(Callable<V> task) {
                  super(task);
              }
      
              DoneFutureTask(Runnable task, V result) {
                  super(task, result);
              }
      
              @Override
              protected void set(V v) {
                  super.set(v);
                  outcome = v;
              }
      
              @Override
              public V get() throws InterruptedException, ExecutionException {
                  try {
                      return super.get();
                  } catch (CancellationException e) {
                      return (V)outcome;
                  }
              }
      
          }
      
          private RunnableFuture<V> newTaskFor(Callable<V> task) {
                  return new DoneFutureTask<V>(task);
          }
      
          private RunnableFuture<V> newTaskFor(Runnable task, V result) {
                  return new DoneFutureTask<V>(task, result);
          }
      
          /**
           * Creates an MyCompletionService using the supplied
           * executor for base task execution and a
           * {@link LinkedBlockingQueue} as a completion queue.
           *
           * @param executor the executor to use
           * @throws NullPointerException if executor is {@code null}
           */
          public MyCompletionService(Executor executor) {
              if (executor == null)
                  throw new NullPointerException();
              this.executor = executor;
              this.aes = (executor instanceof AbstractExecutorService) ?
                      (AbstractExecutorService) executor : null;
              this.completionQueue = new LinkedBlockingQueue<Future<V>>();
          }
      
          /**
           * Creates an MyCompletionService using the supplied
           * executor for base task execution and the supplied queue as its
           * completion queue.
           *
           * @param executor the executor to use
           * @param completionQueue the queue to use as the completion queue
           *        normally one dedicated for use by this service. This
           *        queue is treated as unbounded -- failed attempted
           *        {@code Queue.add} operations for completed tasks cause
           *        them not to be retrievable.
           * @throws NullPointerException if executor or completionQueue are {@code null}
           */
          public MyCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
              if (executor == null || completionQueue == null)
                  throw new NullPointerException();
              this.executor = executor;
              this.aes = (executor instanceof AbstractExecutorService) ?
                      (AbstractExecutorService) executor : null;
              this.completionQueue = completionQueue;
          }
      
          public Future<V> submit(Callable<V> task) {
              if (task == null) throw new NullPointerException();
              RunnableFuture<V> f = newTaskFor(task);
              executor.execute(new QueueingFuture<V>(f, completionQueue));
              return f;
          }
      
          public Future<V> submit(Runnable task, V result) {
              if (task == null) throw new NullPointerException();
              RunnableFuture<V> f = newTaskFor(task, result);
              executor.execute(new QueueingFuture<V>(f, completionQueue));
              return f;
          }
      
          public Future<V> take() throws InterruptedException {
              return completionQueue.take();
          }
      
          public Future<V> poll() {
              return completionQueue.poll();
          }
      
          public Future<V> poll(long timeout, TimeUnit unit)
                  throws InterruptedException {
              return completionQueue.poll(timeout, unit);
          }
      
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-01-23
        • 1970-01-01
        • 2021-12-08
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多