【问题标题】:Project Reactor: Wait for list of Mono's to completeProject Reactor:等待 Mono 的列表完成
【发布时间】:2020-01-27 15:29:14
【问题描述】:

我希望代码等待 Mono 完成然后收集结果,但它永远不会发生。

为什么?

这是我的代码:

public static void main(String[] args) throws Exception {
        Mono<Integer> mono1 = Mono.fromCallable(() -> 1);
        Mono<Integer> mono2 = Mono.fromCallable(() -> 2);
        List<Mono<Integer>> monos = Arrays.asList(mono1, mono2);

        Mono
                .when(monos)
                .subscribe(__ -> {
                    int i1 = mono1.block();
                    int i2 = mono1.block();

                    System.out.println(i1 + i2);
                });

        Thread.currentThread().join();
    }

【问题讨论】:

  • 从不要求在订阅中阻塞,这是一种反模式。您实际上通过执行此操作多次触发可调用对象(一次按时间,一次按块)。 when 是关于等待完成,忽略元素及其类型。 zip 将允许您组合几个 valued 单声道的值。但从你的其他 cmet 看来,你似乎在追求更不同的东西。

标签: java project-reactor


【解决方案1】:

您的 Mono.when() 调用是 Mono&lt;Void&gt; 类型的 - 它只是完成(或错误,如果它的发布者之一返回错误。)因此永远不会发出元素,因此永远不会调用 subscribe(),因此该订阅块中的代码永远不会执行。

目前尚不清楚您想在这里发生什么,但最快的“修复”可能是在订阅之前到materialize(),因此您会得到onComplete() 信号作为元素传播:

Mono
        .when(monos)
        .materialize()
        .subscribe(__ -> {
            int i1 = mono1.block();
            int i2 = mono1.block();

            System.out.println(i1 + i2);
        });

请注意,这将打印2,而不是3,因为i1i2 都引用mono1

【讨论】:

  • 我有一个Flux&lt;IN&gt;。我想将它映射到Flux&lt;List&lt;OUT&gt;&gt;,其中每个OUT 都是一些Function&lt;IN, OUT&gt; 的产物。而且,我想异步进行。也就是说,我们等待所有函数完成并将其收集到OUTs 的列表中。
  • 我应该出现在问题中 - 抱歉。
  • @yaseco 你的意思是映射到Mono&lt;List&lt;OUT&gt;&gt;吗?如果您真的想将Flux&lt;IN&gt; 映射到Flux&lt;List&lt;OUT&gt;&gt;,则需要指定窗口详细信息(第一个列表中包含哪些元素,第二个列表中包含哪些元素等)
  • 是的,我的意思是Flux&lt;List&lt;OUT&gt;。顺序对我来说并不重要 - 这样会更容易吗?
  • Flux&lt;IN&gt;.map(FUNCTION).collectList() 会做到这一点,反应性地将函数应用于每个输入元素,并将结果添加到源完成时发出的单个列表中
猜你喜欢
  • 1970-01-01
  • 2018-07-13
  • 1970-01-01
  • 2021-10-13
  • 2019-01-07
  • 2021-06-12
  • 2015-08-24
  • 2020-09-25
  • 2018-12-01
相关资源
最近更新 更多