【问题标题】:How to release resource in canceled CompletableFuture如何在取消的 CompletableFuture 中释放资源
【发布时间】:2014-09-25 14:21:45
【问题描述】:

用例

假设我们使用 CompletableFuture.runAsync(..) 运行执行,并且在 runnable 中我们有 try-with-resources 块(我们正在使用一些资源,无论发生什么都应该关闭),并且在某些时候执行未完成尝试块我们取消可完成的未来......虽然执行停止了应该关闭的资源没有关闭AutoClosable的close()没有被调用......


问题

这是一个 java 问题还是有办法正确地做到这一点?没有像使用期货(支持中断等)这样的变通办法,如果它的预期行为在不可中断的 CompletableFuture 被取消时应该如何处理类似情况......?


代码

public class AutoClosableResourceTest {

    public static class SomeService{
        public void connect(){
            System.out.println("connect");
        }

        public Integer disconnect(){
            System.out.println("disconnect");
            return null;
        }
    }

    public static class AutoClosableResource<T> implements AutoCloseable {

        private final T resource;
        private final Runnable closeFunction;

        private AutoClosableResource(T resource, Runnable closeFunction){
            this.resource = resource;
            this.closeFunction = closeFunction;
        }

        public T get(){
            return resource;
        }

        @Override
        public void close() throws Exception {
            closeFunction.run();
        }
    }

    @Test
    public void testTryWithResource() throws InterruptedException {
        SomeService service  = new SomeService();

        CompletableFuture<Void> async = CompletableFuture.runAsync(() -> {
            try (AutoClosableResource<SomeService> resource = new AutoClosableResource<>(service, service::disconnect)) {
                resource.get().connect();
                while (true) {
                    Thread.sleep(1000);
                    System.out.println("working...");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        Thread.sleep(2500);
        async.cancel(true);
        Thread.sleep(2500);

    }
}

这会产生

connect
working...
working...
working...
working...

如您所见,它不会调用 cancel() 并让资源处于打开状态...

【问题讨论】:

    标签: java concurrency java-8 try-with-resources


    【解决方案1】:

    您似乎很难理解CompletableFuture 的目的是什么。看看its class documentation的第一句话:

    可以显式完成的Future(设置其值和状态),...

    因此,与FutureTask 不同,FutureTask 由执行其run 方法的线程完成,CompletableFuture 可以由将在任意时间点设置其值/状态的任何线程完成。 CompletableFuture 不知道哪个线程将完成它,甚至不知道当前是否有线程正在完成它。

    因此CompletableFuture在被取消时不能中断正确的线程。这是其设计的基本部分。

    如果你想要一个可以中断的工作线程,最好使用FutureTask/ThreadPoolExecutor。以这种方式安排的任务可能仍会在其结束时完成 CompletableFuture

    【讨论】:

      【解决方案2】:

      下面的代码会陷入死循环。调用 async.cancel 将不会与它想要停止的以下循环通信。

      while (true) {
          Thread.sleep(1000);
          System.out.println("working...");
      }
      

      测试用例退出,因为卡在这个循环中的线程不是守护线程。

      将 while 循环检查替换为以下内容,它会在每次迭代时检查 isCancelled 标志。调用 CompletableFuture.cancel() 会将未来标记为已取消,但不会中断通过 runAsync 启动的线程。

      while (isCancelled()) {
          Thread.sleep(1000);
         System.out.println("working...");
      }
      

      【讨论】:

      • 如何从 Executor/Thread/Runnable/Callable/FutureTask 中访问 CompletableFuture,以便您可以将任务指示为已完成/已取消,或检查是否已取消?我的意思是,您正在从计算循环中调用 isCancelled,这意味着您假设计算循环被包装在 CompletableFuture 子项中。我不明白对不起
      【解决方案3】:

      您可以使用 CompletableFuture 的“完成”方法来停止线程。

      下面是一个显示行为的简单代码:

      package com.ardevco;
      
      import java.util.concurrent.CompletableFuture;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      
      public class CompletableFutureTest3 {
        public static void main(String[] args) throws Exception {
      
           ExecutorService pool = Executors.newFixedThreadPool(5);
      
           CompletableFuture<Integer> longRunningcompletableFuture = CompletableFuture.supplyAsync(() -> {
              for (int i = 0; i < 1; i--) {
                 System.out.println("i " + i);
                 sleep();
              }
              return 1; // we will newer reach this line so the thread will be stuck
           });
      
           CompletableFuture<Integer> completor = CompletableFuture.supplyAsync(() -> {
              System.out.println("completing the longRunningcompletableFuture");
              longRunningcompletableFuture.complete(1000);
              System.out.println("completed the longRunningcompletableFuture");
              return 10;
           });
      
           Thread.sleep(10000);
      
           System.out.println("completor...");
           int i = completor.get();
           System.out.println("completor i:" + i);
           System.out.println("completor...");
      
           System.out.println("completableFutureToBeCompleted2...");
           int i2 = longRunningcompletableFuture.get();
           System.out.println("completableFutureToBeCompleted2: " + i2);
           System.out.println("completableFutureToBeCompleted2...");
      
        }
      
        private static void sleep() {
           try {Thread.sleep(1000);}catch (Exception e) {}
        }
      

      }

      输出:

      我 0 完成 longRunningcompletableFuture 完成了 longRunningcompletableFuture 我 -1 我 -2 我 -3 我 -4 我 -5 我 -6 我 -7 我 -8 我 -9 我 -10 完成者... 完成者 i:10 完成者... completableFutureToBeCompleted2... 可完成的未来待完成2:1000 completableFutureToBeCompleted2...

      【讨论】:

        【解决方案4】:

        虽然有一个标记为正确的答案,但原因却大不相同 - 请参阅CompletableFuture.cancel(mayInterruptIfRunning) method 的文档并阅读文章CompletableFuture can't be interrupted 以更好地了解问题。

        此问题已在我的 Tascalate Concurrent 库中得到解决,您的代码更改应为: 从 CompletableFuture<Void> async = CompletableFuture.runAsync(() -> { ... });

        Promise<Void> async = CompletableTask.runAsync(() -> { ... }, someExplicitExecutor); ...你会得到预期的行为(执行线程被中断,AutoClosable 被关闭,asyncCancellationException 完成)。

        你可以在my blog阅读更多关于图书馆的信息

        【讨论】:

          【解决方案5】:

          我在 Java 8 SE 中也遇​​到了这个问题。对我来说,重要的是不要使用第三方库。

          cancel(ma​​yInterruptIfRunning) 此值在此实现中无效,因为不使用中断来控制处理。

          想法是在调用cancel()的时候使用Thread.interrupt(),但只适用于Runnable.

          /** Enable and disable the interrupt */
          private static class Interruptor {
          
              volatile boolean interrupted;
              volatile Runnable interrupt;
          
              /** Enable interrupt support */
              synchronized boolean start() {
                  if (interrupted) {
                      return false;
                  }
                  Thread runThread = Thread.currentThread();
                  interrupt = () -> {
                      if (runThread != Thread.currentThread()) {
                          runThread.interrupt();
                      }
                  };
                  return true;
              }
          
              /** Interrupt Runnable */
              synchronized void interrupt() {
                  if (interrupted) {
                      return;
                  }
                  interrupted = true;
                  if (interrupt != null) {
                      interrupt.run();
                      interrupt = null;
                  }
              }
          
              /** Disable interrupt support */
              synchronized void finish() {
                  interrupt = null;
              }
          }
          
          
          /** CompletableFuture with interrupt support */
          public static CompletableFuture<Void> runAsyncInterrupted(Runnable run) {
          
              final Interruptor interruptor = new Interruptor();
          
              Runnable wrap = () -> {
                  if (!interruptor.start()) { // allow interruption
                      return; // was canceled before the thread started
                  }
                  try {
                      run.run(); // can be interrupted
                  } finally {
                      interruptor.finish(); // can no longer be interrupted
                  }
              };
          
              CompletableFuture<Void> cfRun = CompletableFuture.runAsync(wrap);
          
              // here is caught "CompletableFuture.cancel()"
              cfRun.whenComplete((r, t) -> {
                  if (t instanceof CancellationException) {
                      interruptor.interrupt();
                  }
              });
          
              return cfRun;
          }
          

          使用示例

          Runnable mySlowIoRun = () -> {
              try {
                  InputStream is = openSomeResource(); // open resource
                  try {
                      // there may be problem (#1) with reading,
                      // such as loss of network connection
                      int bt = is.read();
                      // ..
                      // .. some code
                  } finally {
                      is.close(); // problem (#2): releases any system resources associated with the stream
                  }
              } catch (Throwable th) {
                  throw new RuntimeException(th);
              }
          };
          
          CompletableFuture<Void> cf = runAsyncInterrupted(mySlowIoRun);
          
          try {
              cf.get(5, TimeUnit.SECONDS); // 5 sec timeout
          } catch (Throwable th) {
              cf.cancel(true); // cancel with interrupt mySlowIoRun
              throw th;
          }
          

          【讨论】:

            【解决方案6】:

            所以这是我通常如何处理问题的概括。传入可取消状态,并在打开状态后立即关闭资源。

            private static BufferedReader openFile(String fn) {
                try {
                    return Files.newBufferedReader(Paths.get(fn));
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            
            static class Util {
                static void closeQuietly(AutoCloseable c) {
                    if (c == null) return;
                    try {
                        c.close();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            
                static <T extends AutoCloseable, R> R runThenCloseQuietly(T c, Function<T,R> cb) {
                    try {
                        return cb.apply(c);
                    } finally {
                        closeQuietly(c);
                    }
                }
            
                static <T extends AutoCloseable, R> Optional<R> runThenCloseQuietlyCancellable(BooleanSupplier cancelled
                    , T c, Function<T,Optional<R>> cb) {
                    if (c == null) return Optional.empty(); // safe doesn't throw
                    try {
                        if (cancelled.getAsBoolean()) return Optional.empty(); // might throw, wrap for safety
                        return cb.apply(c); // might throw
                    } finally {
                        closeQuietly(c); // might throw, but at least we're closed
                    }
                }
            
                private static Optional<String> emptyString() {
                    return Optional.empty();
                }
            }
            
            interface Cancellable {
                boolean isCancelled();
                void cancel();
            }
            
            static class CancellableAB implements Cancellable {
                private final AtomicBoolean cancelled;
            
                CancellableAB(AtomicBoolean cancelled) {
                    this.cancelled = cancelled;
                }
            
                @Override
                public boolean isCancelled() {
                    return cancelled.get();
                }
            
                @Override
                public void cancel() {
                    cancelled.set(true);
                }
            }
            static class CancellableArray implements Cancellable {
                private final boolean[] cancelled;
                private final int idx;
                CancellableArray(boolean[] cancelled) {
                    this(cancelled, 0);
                }
                CancellableArray(boolean[] cancelled, int idx) {
                    this.cancelled = cancelled;
                    this.idx = idx;
                }
            
                @Override
                public boolean isCancelled() {
                    return cancelled[idx];
                }
            
                @Override
                public void cancel() {
                    cancelled[idx]=true;
                }
            }
            
            static class CancellableV implements Cancellable {
                volatile boolean cancelled;
            
                @Override
                public boolean isCancelled() {
                    return cancelled;
                }
            
                @Override
                public void cancel() {
                    this.cancelled = true;
                }
            }
            
            /**
             * The only reason this is a class is because we need SOME external object for the lambda to check for mutated
             * cancelled state.
             * This gives the added benefit that we can directly call cancel on the resource.
             * We allow a cancellable to be passed in to CHAIN-IN cancellable state.  e.g. if cancellation should affect MULTIPLE
             * CompletableFuture states, we don't want other promises to tie references to this task.. So the cancellable
             * object can be externalized.
             * 
             * Normally you don't need this much genericism, you can directly implement a volatile 'cancel boolean'.
             * But this allows you to create a C.F. task as a 3rd party library call - gives maximum flexibility to invoker.
             *
             */
            static class FooTask {
                volatile Cancellable cancelled;
                String fileName;
            
                public FooTask(String fileName) {
                    this.fileName = fileName;
                    this.cancelled = new CancellableV();
                }
            
                public FooTask(String fileName, Cancellable cancelled) {
                    this.cancelled = cancelled;
                }
            
            
                public boolean isCancelled() {
                    return cancelled.isCancelled();
                }
            
                public void cancel() {
                    cancelled.cancel();
                }
            
                /**
                 * asynchronously opens file, scans for first valid line (closes file), then processes the line.
                 * Note if an exception happens, it's the same as not finding any lines. Don't need to special case.
                 * Use of utility functions is mostly for generic-mapping
                 * (avoiding annoying double-type-casting plus editor warnings)
                 */
                CompletableFuture<Optional<Long>> run1() {
                    return
                        CompletableFuture.supplyAsync(() -> openFile(fileName))
                            .thenApplyAsync(c ->  { // this stage MUST close the prior stage
                                    if(cancelled.isCancelled() || c == null) return Util.emptyString(); // shouldn't throw
                                    try {
                                        return c
                                            .lines()
                                            .filter(line -> !cancelled.isCancelled())
                                            .filter(line -> !line.startsWith("#"))
                                            .findFirst();
                                    } catch (RuntimeException e) {
                                        Util.closeQuietly(c);
                                        throw new RuntimeException(e);
                                    }
                                }
                            )
                            .thenApplyAsync(oLine -> // this stage doesn't need closing
                                oLine
                                    .map(line -> line.split(":"))
                                    .map(cols -> cols[2])
                                    .map(Long::valueOf)
                                    )
                        ;
                }
            
            
                /**
                 * Same as run1 but avoids messy brackets + try-finally
                 */
                CompletableFuture<Optional<Long>> run2() {
                    return
                        CompletableFuture.supplyAsync(() -> openFile(fileName))
                            .thenApplyAsync(c ->  // this stage MUST close the prior stage
                                Util.runThenCloseQuietly(
                                    c
                                    , r -> cancelled.isCancelled() ? Util.emptyString() // shouldn't throw
                                        : r
                                        .lines()
                                        .filter(line -> !cancelled.isCancelled())
                                        .filter(line -> !line.startsWith("#"))
                                        .findFirst()
                                ))
                            .thenApplyAsync(oLine -> // this stage doesn't need closing
                                oLine
                                    .map(line -> line.split(":"))
                                    .map(cols -> cols[2])
                                    .map(Long::valueOf)
                                    )
                        ;
                }
            
                /**
                 * Same as run2 but avoids needing the teneary operator - says Cancellable in func-name so is more readable
                 */
                CompletableFuture<Optional<Long>> run3() {
                    return
                        CompletableFuture.supplyAsync(() -> openFile(fileName))
                            .thenApplyAsync(c ->  // this stage MUST close the prior stage
                                Util.runThenCloseQuietlyCancellable(
                                cancelled::isCancelled // lambda here is slightly easier to read than explicit if-statement
                                , c
                                , r ->  r
                                        .lines()
                                        .filter(line -> !cancelled.isCancelled())
                                        .filter(line -> !line.startsWith("#"))
                                        .findFirst()
                            ))
                            .thenApplyAsync(oLine -> // this stage doesn't need closing
                                oLine
                                    .map(line -> line.split(":"))
                                    .map(cols -> cols[2])
                                    .map(Long::valueOf)
                                    )
                    ;
                }
            
            }
            
            @Test
            public void testFooGood() {
                var task = new FooTask("/etc/passwd");
                var cf = task.run3();
            
                var oVal = cf.join();
                assertTrue(oVal.isPresent());
                System.out.println(oVal.get()); // should not throw
            }
            
            @Test
            public void testFooCancel() {
                var task = new FooTask("/etc/passwd");
                var cf = task.run3();
                task.cancel();
            
                var oVal = cf.join();
                assertTrue(oVal.isEmpty());
            }
            

            【讨论】:

              猜你喜欢
              • 2013-08-03
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2011-11-26
              • 1970-01-01
              • 2015-03-29
              • 2011-02-15
              • 1970-01-01
              相关资源
              最近更新 更多