【问题标题】:CompletableFuture from Callable?来自 Callable 的 CompletableFuture?
【发布时间】:2015-05-31 16:24:30
【问题描述】:

今天我尝试了 Java 8 中的“新”CompletableFuture,当我没有找到 runAsync(Callable) 方法时,我感到很困惑。 我可以自己做,如下图所示,但是为什么(对我来说非常明显和有用的实用方法)丢失了? 错过了什么吗?

public static <T> CompletableFuture<T> asFuture(Callable<? extends T> callable, Executor executor) {
    CompletableFuture<T> future = new CompletableFuture<>();
    executor.execute(() -> {
        try {
            future.complete(callable.call());
        } catch (Throwable t) {
            future.completeExceptionally(t);
        }
    });
    return future;
}

【问题讨论】:

标签: java concurrency java-8 java.util.concurrent


【解决方案1】:

你应该使用supplyAsync(Supplier&lt;U&gt;)

一般来说,lambdas 和检查异常不能很好地协同工作,CompletableFuture 通过设计避免检查异常。虽然在你的情况下应该没问题。

相关话题:

http://cs.oswego.edu/pipermail/concurrency-interest/2012-December/010486.html

http://cs.oswego.edu/pipermail/concurrency-interest/2014-August/012911.html

【讨论】:

  • 这让我看起来更加不一致。因为是的,它确实允许您使用例如处理异常。 whenComplete。但是你只能通过打破流畅的风格并使用你自己的静态帮助方法来让异常“进入”过程,然后调用completeExceptionally。用代码读起来真的很难看。
  • 邮件列表中有很多关于此的讨论。最后,我没有被 Doug Lea 说服,而且我发现整个事情对程序员不友好。我最终制作了自己的Async 界面。
  • 是的,我看到了。那里的 API 做得很好,这正是我所期望的 CompletableFuture 的样子。
  • 但需要有一个官方的 API,以便不同的库可以交互。叹息。
【解决方案2】:

对于那些需要问题中提供的功能的人:我对其进行了一些更改,以理想地反映现有 CompletableFuture.runAsyncCompletableFuture.supplyAsync 函数的行为。

// Java 8 (Java 9+ version below)
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

/**
 * {@link CompletableFuture} utils.
 *
 * @author stonar96
 *
 * @see CompletableFuture
 */
public final class CompletableFutureUtils {
    /**
     * Delegates the given Callable to
     * {@link CompletableFuture#supplyAsync(Supplier)}, handles checked exceptions
     * accordingly to unchecked exceptions and associates a new CompletableFuture.
     *
     * @param callable a function returning the value to be used to complete the
     *                 returned CompletableFuture
     * @param          <U> the function's return type
     * @return the new associated CompletableFuture
     * @see CompletableFuture#supplyAsync(Supplier)
     */
    public static <U> CompletableFuture<U> callAsync(Callable<U> callable) {
        return completeAsync(new CompletableFuture<>(), callable);
    }

    /**
     * Delegates the given Callable to
     * {@link CompletableFuture#supplyAsync(Supplier)}, handles checked exceptions
     * accordingly to unchecked exceptions and associates the given
     * CompletableFuture.
     *
     * @param result   the CompletableFuture to be associated
     * @param callable a function returning the value to be used to complete the
     *                 returned CompletableFuture
     * @param          <T> the function's return type
     * @return the given associated CompletableFuture
     * @see CompletableFuture#supplyAsync(Supplier)
     */
    public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> result, Callable<? extends T> callable) {
        if (result == null) {
            throw new NullPointerException();
        }

        CompletableFuture<T> delegate = CompletableFuture.supplyAsync(callable == null ? null : () -> {
            try {
                return callable.call();
            } catch (Throwable t) {
                if (t instanceof Error) {
                    throw (Error) t;
                }

                if (t instanceof RuntimeException) {
                    throw (RuntimeException) t;
                }

                result.completeExceptionally(t);
            }

            return null;
        });

        if (delegate == null) {
            return null;
        }

        result.whenComplete((v, t) -> {
            if (t == null) {
                delegate.complete(v);
                return;
            }

            delegate.completeExceptionally(t);
        });
        delegate.whenComplete((v, t) -> {
            if (t == null) {
                result.complete(v);
                return;
            }

            result.completeExceptionally(t);
        });
        return result;
    }

    /**
     * Delegates the given Callable and Executor to
     * {@link CompletableFuture#supplyAsync(Supplier, Executor)}, handles checked
     * exceptions accordingly to unchecked exceptions and associates a new
     * CompletableFuture.
     *
     * @param callable a function returning the value to be used to complete the
     *                 returned CompletableFuture
     * @param executor the executor to use for asynchronous execution
     * @param          <U> the function's return type
     * @return the new associated CompletableFuture
     * @see CompletableFuture#supplyAsync(Supplier, Executor)
     */
    public static <U> CompletableFuture<U> callAsync(Callable<U> callable, Executor executor) {
        return completeAsync(new CompletableFuture<>(), callable, executor);
    }

    /**
     * Delegates the given Callable and Executor to
     * {@link CompletableFuture#supplyAsync(Supplier, Executor)}, handles checked
     * exceptions accordingly to unchecked exceptions and associates the given
     * CompletableFuture.
     *
     * @param result   the CompletableFuture to be associated
     * @param callable a function returning the value to be used to complete the
     *                 returned CompletableFuture
     * @param executor the executor to use for asynchronous execution
     * @param          <T> the function's return type
     * @return the given associated CompletableFuture
     * @see CompletableFuture#supplyAsync(Supplier, Executor)
     */
    public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> result, Callable<? extends T> callable, Executor executor) {
        if (result == null) {
            throw new NullPointerException();
        }

        CompletableFuture<T> delegate = CompletableFuture.supplyAsync(callable == null ? null : () -> {
            try {
                return callable.call();
            } catch (Throwable t) {
                if (t instanceof Error) {
                    throw (Error) t;
                }

                if (t instanceof RuntimeException) {
                    throw (RuntimeException) t;
                }

                result.completeExceptionally(t);
            }

            return null;
        }, executor);

        if (delegate == null) {
            return null;
        }

        result.whenComplete((v, t) -> {
            if (t == null) {
                delegate.complete(v);
                return;
            }

            delegate.completeExceptionally(t);
        });
        delegate.whenComplete((v, t) -> {
            if (t == null) {
                result.complete(v);
                return;
            }

            result.completeExceptionally(t);
        });
        return result;
    }

    private CompletableFutureUtils() {
        throw new AssertionError("CompletableFutureUtils cannot be instantiated");
    }
}

// Java 9+
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

/**
 * {@link CompletableFuture} utils.
 *
 * @author stonar96
 *
 * @see CompletableFuture
 */
public final class CompletableFutureUtils {
    /**
     * Delegates the given Callable to
     * {@link CompletableFuture#completeAsync(Supplier)} using a new
     * CompletableFuture and handles checked exceptions accordingly to unchecked
     * exceptions.
     *
     * @param callable a function returning the value to be used to complete the
     *                 returned CompletableFuture
     * @param          <U> the function's return type
     * @return the new CompletableFuture
     * @see CompletableFuture#completeAsync(Supplier)
     */
    public static <U> CompletableFuture<U> callAsync(Callable<U> callable) {
        return completeAsync(new CompletableFuture<>(), callable);
    }

    /**
     * Delegates the given Callable to
     * {@link CompletableFuture#completeAsync(Supplier)} using the given
     * CompletableFuture and handles checked exceptions accordingly to unchecked
     * exceptions.
     *
     * @param result   the CompletableFuture to be used
     * @param callable a function returning the value to be used to complete the
     *                 returned CompletableFuture
     * @param          <T> the function's return type
     * @return the given CompletableFuture
     * @see CompletableFuture#completeAsync(Supplier)
     */
    public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> result, Callable<? extends T> callable) {
        return result.completeAsync(callable == null ? null : () -> {
            try {
                return callable.call();
            } catch (Throwable t) {
                if (t instanceof Error) {
                    throw (Error) t;
                }

                if (t instanceof RuntimeException) {
                    throw (RuntimeException) t;
                }

                result.completeExceptionally(t);
            }

            return null;
        });
    }

    /**
     * Delegates the given Callable and Executor to
     * {@link CompletableFuture#completeAsync(Supplier, Executor)} using a new
     * CompletableFuture and handles checked exceptions accordingly to unchecked
     * exceptions.
     *
     * @param callable a function returning the value to be used to complete the
     *                 returned CompletableFuture
     * @param executor the executor to use for asynchronous execution
     * @param          <U> the function's return type
     * @return the new CompletableFuture
     * @see CompletableFuture#completeAsync(Supplier, Executor)
     */
    public static <U> CompletableFuture<U> callAsync(Callable<U> callable, Executor executor) {
        return completeAsync(new CompletableFuture<>(), callable, executor);
    }

    /**
     * Delegates the given Callable and Executor to
     * {@link CompletableFuture#completeAsync(Supplier, Executor)} using the given
     * CompletableFuture and handles checked exceptions accordingly to unchecked
     * exceptions.
     *
     * @param result   the CompletableFuture to be used
     * @param callable a function returning the value to be used to complete the
     *                 returned CompletableFuture
     * @param executor the executor to use for asynchronous execution
     * @param          <T> the function's return type
     * @return the given CompletableFuture
     * @see CompletableFuture#completeAsync(Supplier, Executor)
     */
    public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> result, Callable<? extends T> callable, Executor executor) {
        return result.completeAsync(callable == null ? null : () -> {
            try {
                return callable.call();
            } catch (Throwable t) {
                if (t instanceof Error) {
                    throw (Error) t;
                }

                if (t instanceof RuntimeException) {
                    throw (RuntimeException) t;
                }

                result.completeExceptionally(t);
            }

            return null;
        }, executor);
    }

    private CompletableFutureUtils() {
        throw new AssertionError("CompletableFutureUtils cannot be instantiated");
    }
}

如您所见,一切都按原样委托,除了必须处理的已检查异常。

【讨论】:

    猜你喜欢
    • 2018-03-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多