【问题标题】:CompletableFuture : Invoke a void function asynchronuslyCompletableFuture : 异步调用 void 函数
【发布时间】:2018-03-13 02:51:31
【问题描述】:

我正在尝试对某些数据库异常执行带有重试策略的数据库查询。重试策略的代码不是很相关,所以我没有包含它。正如您在下面的代码中看到的 - 我编写了一个 retryCallable,它采用重试策略和 populateData() 中的 Callable。

getDataFromDB 中,我从数据库中获取数据并将数据放入一个全局哈希映射中,该哈希映射用作应用程序级别的缓存。

此代码按预期工作。我想从另一个类调用populateData。但是,这将是一个阻塞调用。由于这是数据库并且具有重试策略,因此这可能会很慢。我想异步调用populateData

如何使用 CompletableFuture 或 FutureTask 来实现这一点? CompletableFuture.runAsync 需要一个可运行的。 CompletableFuture.supplyAsync 需要一个供应商。我以前没有实现过这些东西。因此,任何关于最佳实践的建议都会有所帮助。

Class TestCallableRetry {

public void populateData() {
        final Callable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB());
        Set<String> data = new HashSet<>();

        data = retryCallable.call();

        if (data != null && !data.isEmpty()) {
            // store data in a global hash map
        }
    }

    private Callable<Set<Building>> getDataFromDB() {
        return new Callable<Set<String>>() {
            @Override
            public Set<String> call() {
                // returns data from database
            }
        };
    }
}

Class InvokeCallableAsynchronously {
    public void putDataInGlobalMap {
      // call populateData asynchronously
    }
}

【问题讨论】:

    标签: java runnable callable completable-future


    【解决方案1】:

    如果您将populateData 方法分成两部分,一个Supplier 用于获取数据,另一个Consumer 用于存储数据,则很容易将它们与CompletableFuture 链接起来。

    // Signature compatible with Supplier<Set<String>> 
    private Set<String> fetchDataWithRetry() {
        final RetryingCallable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB());
        try {
            return retryCallable.call();
        } catch (Exception e) {
            log.error("Call to database failed", e);
            return Collections.emptySet();
        }
    }
    
    // Signature compatible with Consumer<Set<String>>
    private void storeData(Set<String> data) {
        if (!data.isEmpty()) {
            // store data in a global hash map
        }
    }
    

    然后,在populateData()

    private ExecutorService executor = Executors.newCachedThreadPool();
    
    public void populateData() {
        CompletableFuture
            .supplyAsync(this::fetchDataWithRetry, executor)
            .thenAccept(this::storeData);
    }
    

    使用带有ExecutorsupplyAsync 版本是可选的。如果您使用单 arg 版本,您的任务将在公共池中运行;适用于短期运行的任务,但不适用于阻塞的任务。

    【讨论】:

    • 又好又简单的一个,因为我更改了getDataFromDB() 签名
    • 非常感谢您的回答。我会试试看。
    【解决方案2】:

    您可以在 CompletableFuture 中组合多种实用方法,所有这些方法都值得一试。

    让我们从populateData 方法开始。通过它的名字,你可以推断它应该接受来自某个地方的数据流。

    它的签名可能如下所示:

    void populateData ( Supplier<? extends Collection<Building> dataSupplier );
    

    Supplier,顾名思义,就是为我们提供一些数据的东西。

    getDataFromDB() 似乎适合作为Supplier 角色。

    private Set<Building> getDataFromDB() // supply a building's collection
    

    我们希望populateData 执行asynchronously 并返回一个结果,无论操作是否正确执行。

    所以,在未来,populateData 可能会回来,告诉我们事情进展如何。

    让我们将签名转换为:

    CompletableFuture<Result> populateData(Supplier<? extends Collection<Building>> supplier);
    

    现在让我们看看方法体的样子:

    CompletableFuture<Result> populateData(Supplier<? extends Collection<Building>> supplier) {
        return CompletableFuture                // create new completable future from factory method
                .supplyAsync(supplier)          // execute the supplier method (getDataFromDB() in our case) 
                .thenApplyAsync(data -> {       // here we can work on the data supplied
                    if (data == null || data.isEmpty()) return new Result(false);
                    // some heavy operations
                    for (Building building : data) {
                        // do something
                    }
    
                    return new Result(true); // return dummy positive result data            
                 })
                 .handleAsync((result, throwable) -> {
                    // check if there was any exception
                    if (throwable != null) {
                        // check if exception was thrown
                        Log.log(throwable);
                        return new Result(false);
                    }
                    return result;
                });
    }
    

    现在我们可以从某个地方调用populateData,并在异步执行完成后应用另一个回调来执行。

    populateData(TestCallableRetry::getDataFromDB).thenAccept( result -> {
            if ( ! result.success ) {
                // things went bad... retry ??
            }
        });
    

    现在取决于您希望如何应用您的重试策略。如果您只想重试一次,您可以在thenAcceptAsync 内再次拨打populateData

    您还应该在您的供应商方法中catch 异常并将它们转换为java.util.concurrent.CompletionException,因为它们在CompletableFuture 中得到顺利处理。

    【讨论】:

      【解决方案3】:

      它非常简单,因为 java8 只需使用 CompletableFuture.runAsync(() -> object.func());

      【讨论】:

      • 这可能由于其长度而被自动标记。像这样的简短答案通常不能提供足够的上下文来真正有用。在这种情况下,描述如何使用建议的解决方案将是一个很大的改进。
      • 嘿,请帮助我理解为什么这不能回答问题以及如何使用建议的解决方案是什么意思?我应该添加'如果对象是用户可以调用的TestCallableRetry 的实例-CompletableFuture.runAsync(() -&gt; object.populateData());'
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2012-07-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-03-17
      • 2013-03-28
      • 2018-12-13
      相关资源
      最近更新 更多