【问题标题】:How to check when all CompleteableFuture are done?如何检查所有 CompletableFuture 何时完成?
【发布时间】:2023-03-22 13:02:01
【问题描述】:

我有一个Stream<Item>,我将其映射到CompleteableFuture<ItemResult>

我想做的是知道所有期货何时完成。 有人可能会建议:

  1. 将所有期货收集到一个数组中并使用 CompleteableFuture.allOf()。这有点问题,因为可能有数十万个项目
  2. 继续forEach(CompleteableFuture::join)。这也是有问题的,因为使用 join 调用 forEach 只会阻塞流,它本质上是串行处理而不是并发处理
  3. 在流​​的末尾注入有毒物品。这可以工作,但在我看来它不是那么优雅
  4. 检查执行程序队列是否为空 - 这是非常有限的,因为我将来可能会使用多个执行程序。此外,队列可能暂时为空
  5. 改为监控数据库并检查新项目的数量

我觉得所有建议的解决方案都不够好。

监控期货的适当方法是什么?

谢谢

编辑:
我想到的另一个(模糊)想法是使用计数器并等待它下降到零。但同样,需要检查它不是暂时的 0..

【问题讨论】:

  • 第一个命题有什么问题?这就是应该用来知道所有期货何时完成
  • @sp00m,我认为它不能很好地扩展。如果我想处理 1000 万个项目怎么办? 1 亿个项目?
  • 也许CountDownLatch 可能是另一种方法?
  • 你的问题没有意义。您希望完成所有 CF,但您也不想等待所有 CF 都完成,因为可能太多了?你想在这里完成什么?
  • @sp00m Phaser 也是我首先想到的。但是它的 65535 的限制不适合 100M 的项目。

标签: java multithreading asynchronous completable-future


【解决方案1】:

免责声明:我不确定Phaser 是否是正确的工具,如果是的话,是否最好让一个根与多个孩子或像我在下面建议的那样将它们链接起来,所以感觉欢迎纠正我。


这是使用Phaser 的一种方法。

Phaser 有有限数量的当事人,因此如果即将达到该限制,我们需要创建一个新的Phaser: p>

private Phaser register(Phaser phaser) {
    if (phaser.getRegisteredParties() < 65534) {
        // warning: side-effect,
        // conflicts with AtomicReference#updateAndGet recommendation,
        // might not fit well if the Stream is parallel:
        phaser.register();
        return phaser;
    } else {
        return new Phaser(phaser, 1);
    }
}

针对该 Phaser 链注册每个 CompletableFuture,并在完成后取消注册:

private void register(CompletableFuture<?> future, AtomicReference<Phaser> phaser) {
    Phaser registeredPhaser = phaser.updateAndGet(this::register);
    future
            .thenRun(registeredPhaser::arriveAndDeregister)
            .exceptionally(e -> {
                // log e?
                registeredPhaser.arriveAndDeregister();
                return null;
            });
}

等待所有期货完成:

private <T> void await(Stream<CompletableFuture<T>> futures) {
    Phaser rootPhaser = new Phaser(1);
    AtomicReference<Phaser> phaser = new AtomicReference<>(rootPhaser);
    futures.forEach(future -> register(future, phaser));
    rootPhaser.arriveAndAwaitAdvance();
    rootPhaser.arriveAndDeregister();
}

例子:

ExecutorService executor = Executors.newFixedThreadPool(500);

// creating fake stream with 500,000 futures:
Stream<CompletableFuture<Integer>> stream = IntStream
        .rangeClosed(1, 500_000)
        .mapToObj(i -> CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(10);
                if (i % 50_000 == 0) {
                    System.out.println(Thread.currentThread().getName() + ": " + i);
                }
                return i;
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }, executor));

// usage:
await(stream);
System.out.println("Done");

输出:

pool-1-thread-348: 50000
pool-1-thread-395: 100000
pool-1-thread-333: 150000
pool-1-thread-30: 200000
pool-1-thread-120: 250000
pool-1-thread-10: 300000
pool-1-thread-241: 350000
pool-1-thread-340: 400000
pool-1-thread-283: 450000
pool-1-thread-176: 500000
Done

【讨论】:

    猜你喜欢
    • 2018-06-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-03-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多