【问题标题】:CompletableFuture: Await percentage completeCompletableFuture:等待完成百分比
【发布时间】:2021-06-02 06:40:59
【问题描述】:

我正在向分布式系统的 n 个节点并行写入相同的数据。
当这些节点中的 n% 已成功写入时,其余对其他节点的写入并不重要,因为 n% 保证了其他节点之间的复制。

Java 的 CompletableFuture 似乎与我想要的非常接近,例如:

CompletableFuture.anyOf()

(在第一个 future 完成时返回)- 避免不必要的等待,但在我需要 n% 完成时返回太快

CompletableFuture.allOf()

(在所有期货完成时返回)- 避免过早返回但不必要地等待 100% 完成

我正在寻找一种在特定百分比的期货完成后返回的方法。 例如,如果我提供 10 个期货,当其中 6% 或 60% 成功完成时返回。

例如,Bluebird JS 具有此功能

Promise.some(promises, countThatNeedToComplete)

我想知道是否可以在 java 中使用 TheadExecutor 或 vanilla CompletableFuture 做类似的事情

【问题讨论】:

  • 您可以只查看这些方法的源代码,了解其工作原理,然后构建您自己的逻辑以满足您的需求。
  • here,第 2690 和 2784 行。
  • 谢谢,我编写自己的实现没有问题,但认为这一定是一个常见问题,可以通过 vanilla api 或库解决,如果不是这样,我就写我的自己的实现
  • 你看过CountDownLatch吗?

标签: java parallel-processing future percentage completable-future


【解决方案1】:

我相信您可以仅使用 CompletableFuture 已经提供的内容来实现您想要的,但是您必须实施额外的控制才能知道有多少未来任务已经完成,以及当您达到您想要完成的数量/百分比时需要,取消剩余的任务。

下面是一个类来说明这个想法:

public class CompletableSome<T>
{
private List<CompletableFuture<Void>> tasks;
private int tasksCompleted = 0;

public CompletableSome(List<CompletableFuture<T>> tasks, int percentOfTasksThatMustComplete)
{
    int minTasksThatMustComplete = tasks.size() * percentOfTasksThatMustComplete / 100;
    System.out.println(
        String.format("Need to complete at least %s%% of the %s tasks provided, which means %s tasks.",
            percentOfTasksThatMustComplete, tasks.size(), minTasksThatMustComplete));

    this.tasks = new ArrayList<>(tasks.size());
    for (CompletableFuture<?> task : tasks)
    {
        this.tasks.add(task.thenAccept(a -> {
            // thenAccept will be called right after the future task is completed. At this point we'll
            // check if we reached the minimum number of nodes needed. If we did, then complete the 
            // remaining tasks since they are no longer needed.
            tasksCompleted++;
            if (tasksCompleted >= minTasksThatMustComplete)
            {
                tasks.forEach(t -> t.complete(null));
            }
        }));
    }
}

public void execute()
{
    CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[0])).join();
}
}

你可以像下面的例子那样使用这个类:

public static void main(String[] args)
{
    int numberOfNodes = 4;

    // Create one future task for each node.
    List<CompletableFuture<String>> nodes = new ArrayList<>();
    for (int i = 1; i <= numberOfNodes; i++)
    {
        String nodeId = "result" + i;
        nodes.add(CompletableFuture.supplyAsync(() -> {
            try
            {
                // Sleep for some time to avoid all tasks to complete before the count is checked.
                Thread.sleep(100 + new Random().nextInt(500));
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }

            // The action here is just to print the nodeId, you would make the actual call here.
            System.out.println(nodeId + " completed.");
            return nodeId;
        }));
    }

    // Here we're saying that just 75% of the nodes must be called successfully.
    CompletableSome<String> tasks = new CompletableSome<>(nodes, 75);
    tasks.execute();
}

请注意,使用此解决方案,您最终可能会执行比最低要求更多的任务 - 例如,当两个或更多节点几乎同时响应时,您可能会在第一个节点响应时达到所需的最低计数,但会没时间取消其他任务。如果这是一个问题,那么您将不得不实施更多控制。

【讨论】:

  • 谢谢,这让我走上了正轨。我需要返回一个值,如果抛出异常,上述情况会过早存在,但我已经解决了这些问题。
  • 当您从异步作业访问共享资源时,您应该了解线程安全。你不能只做tasksCompleted++;。而且,顺便说一句,我不明白为什么这么多人写System.out.println(String.format(…)) 而不是System.out.printf(…)
  • @holger 正如我在上一段的注释中提到的那样,是的 - 显然 - 我没有涉及同步方面。但它们不会影响问题中要求的结果,这就是我不担心它们的原因。
猜你喜欢
  • 2018-11-08
  • 2014-04-11
  • 2019-05-14
  • 1970-01-01
  • 1970-01-01
  • 2021-12-13
  • 2021-07-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多