【问题标题】:Closing external process in CompletableFuture chain关闭 CompletableFuture 链中的外部进程
【发布时间】:2016-11-16 09:26:52
【问题描述】:

我正在寻找更好的方法来“关闭”一些资源,这里销毁外部Process,在CompletableFuture 链中。现在我的代码大致是这样的:

public CompletableFuture<ExecutionContext> createFuture()
{
    final Process[] processHolder = new Process[1];
    return CompletableFuture.supplyAsync(
            () -> {
                try {
                    processHolder[0] = new ProcessBuilder(COMMAND)
                            .redirectErrorStream(true)
                            .start();
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
                return PARSER.parse(processHolder[0].getInputStream());
            }, SCHEDULER)
            .applyToEither(createTimeoutFuture(DURATION), Function.identity())
            .exceptionally(throwable -> {
                processHolder[0].destroyForcibly();
                if (throwable instanceof TimeoutException) {
                    throw new DatasourceTimeoutException(throwable);
                }
                Throwables.propagateIfInstanceOf(throwable, DatasourceException.class);
                throw new DatasourceException(throwable);
            });
}

我看到的问题是一个“hacky”单元素数组,它包含对进程的引用,因此可以在出现错误时关闭它。是否有一些CompletableFuture API 允许将一些“上下文”传递给exceptionally(或其他一些方法来实现)?

我正在考虑自定义 CompletionStage 实现,但摆脱“持有人”变量似乎是一项艰巨的任务。

【问题讨论】:

    标签: java java-8 completable-future


    【解决方案1】:

    不需要CompletableFutures 的线性链。好吧,实际上,您还没有因为createTimeoutFuture(DURATION) 而实现超时非常复杂。你可以这样说:

    public CompletableFuture<ExecutionContext> createFuture() {
        CompletableFuture<Process> proc=CompletableFuture.supplyAsync(
            () -> {
                try {
                    return new ProcessBuilder(COMMAND).redirectErrorStream(true).start();
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }, SCHEDULER);
        CompletableFuture<ExecutionContext> result
            =proc.thenApplyAsync(process -> PARSER.parse(process.getInputStream()), SCHEDULER);
        proc.thenAcceptAsync(process -> {
            if(!process.waitFor(DURATION, TimeUnit.WHATEVER_DURATION_REFERS_TO)) {
                process.destroyForcibly();
                result.completeExceptionally(
                    new DatasourceTimeoutException(new TimeoutException()));
            }
        });
        return result;
    }
    

    如果你想保持 timout 未来,也许你认为进程启动时间很重要,你可以使用

    public CompletableFuture<ExecutionContext> createFuture() {
        CompletableFuture<Throwable> timeout=createTimeoutFuture(DURATION);
        CompletableFuture<Process> proc=CompletableFuture.supplyAsync(
            () -> {
                try {
                    return new ProcessBuilder(COMMAND).redirectErrorStream(true).start();
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }, SCHEDULER);
        CompletableFuture<ExecutionContext> result
            =proc.thenApplyAsync(process -> PARSER.parse(process.getInputStream()), SCHEDULER);
        timeout.exceptionally(t -> new DatasourceTimeoutException(t))
               .thenAcceptBoth(proc, (x, process) -> {
                    if(process.isAlive()) {
                        process.destroyForcibly();
                        result.completeExceptionally(x);
                    }
                });
        return result;
    }
    

    【讨论】:

    • 第一个 sn-p 我不能使用,因为 .waitFor 阻塞并且我想消耗可能非常大的 InputStream(因此 .redirectErrorStream(true),我稍后有“错误检测”,但这是另一回事)。第二种方式似乎有点复杂(3个future,包括CompletableFuture&lt;Throwable&gt;),但我会试一试。
    • 它在 异步 操作中阻塞。这就是thenAcceptAsync 或者好吧,CompleteableFuture 阶段的全部意义;独立的阶段可以同时运行。在这里,PARSER.parseprocess.waitFor 同时在独立的阶段运行。
    • 啊,我明白了。那就试试waitFor吧!
    • @Holger,第一种方式,您为每次调用createFuture 阻塞了一个超时线程。如果您根据TimerScheduledExecutorService' 的单个实例实现createTimeoutFuture,您将使用一组专用线程(可能只有一个)来处理超时。第二种方法是完全错误的,因为 1)您在超时未来使用 exceptionally,并且 2)如果 proc 抛出异常,.thenAcceptBoth 设置的延续也会异常完成并且不会运行.
    • @acelent:阻塞后台线程本身不是问题,您可以拥有数千个阻塞线程而不会影响性能。如果真的成为性能问题,则需要使用线程池的替代解决方案,否则只是过早的优化。此外,原始代码清楚地表明超时未来将在时间过去后异常完成。因此,在其上使用exceptionally 是正确的。可能还有改进的余地,反正就是为了证明逻辑可以不用单项数组来建模,仅此而已。
    【解决方案2】:

    我自己使用了一项数组来模拟 Java 中的正确闭包。

    另一种选择是使用带字段的私有静态类。优点是它使目的更清晰,并且对具有大闭包的垃圾收集器的影响较小,即具有 N 个字段的对象与长度为 1 的 N 个数组。如果您需要关闭相同的字段,它也会变得有用其他方法。

    这是一个 de facto 模式,甚至超出了CompletableFuture 的范围,并且早在 lambda 成为 Java 中的东西之前,它就已经(ab)使用了,例如匿名类。所以,不要难过,只是 Java 的发展没有为我们提供适当的闭包(还没有?曾经?)。

    如果你愿意,你可以从CompletableFutures 内部的.handle() 返回值,这样你就可以完整地包装完成结果并返回一个包装器。在我看来,这并不比手动关闭更好,还补充说您将在未来创建此类包装器。

    子类化CompletableFuture 不是必需的。你对改变它的行为不感兴趣,只对附加数据感兴趣,你可以通过当前 Java 的 final 变量捕获来做到这一点。也就是说,除非您分析并发现创建这些闭包实际上会以某种方式影响性能,我对此高度怀疑。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-02-27
      • 1970-01-01
      • 2010-10-05
      • 2015-01-18
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多