【问题标题】:combine multiple Mono<List<Item>> into one将多个 Mono<List<Item>> 合并为一个
【发布时间】:2020-07-31 20:22:25
【问题描述】:

我目前正在从事一个涉及一些反应式编程的项目。

我有 4 个不同的响应式存储库,分别从中获得 4 个不同的 Mono&lt;List&lt;SomeType&gt;&gt; 作为回报。 目标是将它们组合成一个Mono&lt;List&lt;GeneralType&gt;&gt;,以便将其合并到自定义响应中,以在ResponseEntity.ok() 中返回。我已经创建了一个GeneralType,并且成功地转换了一个Mono&lt;List&lt;SomeType&gt;&gt;,但是,没有进一步的进展。

所有存储库都有相似的签名:

public Mono<List<SomeType>> findAllByUserId(UUID userId)

我的响应中将所有不同列表合并为一个的字段:

private Mono<List<GeneralType>> items;

到目前为止我的方法是什么样的:

public Mono<List<GeneralType>> combineMonos(UUID userId) {
    Mono<List<GeneralType>> combo1 = reactiveRepository.findAllByUserId(userId)
        .map(list -> list.stream()
            .map(GeneralType::new)
            .collect(Collectors.toList()));
    return combo1; // works just fine
}

所有其他列表都有几乎相同的方法,但是将它们放在一个单一的 Mono 中是一个问题。

我尝试了以下方法:

return Flux.merge(combo1.flatMapMany(Flux::fromIterable), combo2.flatMapMany(Flux::fromIterable)).collectList();

但是,IDE 敦促将返回类型更改为Flux&lt;Object&gt;。 此外,有些列表可能是空的,所以我不确定zip() 是否是这里的一个选项。我已经读过,如果至少有一个结果为空,它会将所有内容都返回为 empty

所以问题是如何在没有 block() 无处不在的情况下以有效的方式完成?

【问题讨论】:

  • 空单声道与空列表单声道不同。如果 mono 的值为空列表,则可以继续 zip。但是如果单声道有空值,它就不能继续zip。您可以尝试zip() 运算符将所有Mono 合并为一个。
  • @shafayathossain 非常感谢您的澄清!我有一种感觉,我在某个地方误解了一些东西......

标签: java reactive-programming project-reactor


【解决方案1】:

Merge 急切地连接到所有数据源。因此,当数据从任何来源发出时,它将被传递到下游管道。结果列表中的顺序基于项目的发出时间。

Zip 方法从源中收集数据并将它们放在一个对象(元组 - 类似于盒子)中并传递到下游。只要所有源都发出数据,Zip 就可以工作。任何来源完成/抛出错误,它将停止。


我认为您的个人方法工作正常。您的问题与将结果合并到一个列表中有关。

private Mono<List<String>> getList1(){
    return Mono.just(List.of("a", "b", "c"));
}

private Mono<List<String>> getList2(){
    return Mono.just(Collections.emptyList());
}

private Mono<List<String>> getList3(){
    return Mono.just(List.of("A", "B", "C"));
}


    Flux.merge(getList1(), getList2(), getList3())
            .flatMapIterable(Function.identity())
            .collectList()
            .subscribe(System.out::println);  // [a, b, c, A, B, C]

参考: http://www.vinsguru.com/reactive-programming-reactor-combining-multiple-sources-of-flux-mono/

【讨论】:

  • 非常感谢!真的很感激!不幸的是,我只能在明天对其进行测试,但是,我有一种强烈的感觉,可以解决问题:) 顺便说一句,我甚至在发布问题之前就已经打开了您的网站。也感谢那里的信息 - 真的很有帮助!
  • @cypherman,我想我误解了你的问题。合并和压缩完全 2 个不同的用例。合并是将 2 个列表合并为一个列表。 Zip 生成一对对象。我以为你想合并回复。也检查另一个答案。如果您认为需要 zip,请接受该答案。
  • 我确实需要合并响应。问题是我有 4 个不同的 Mono&lt;List&lt;GeneralType&gt;&gt;,我需要将它们组合成一个 Mono&lt;List&lt;GeneralType&gt;&gt; 才能返回。
【解决方案2】:

Mono::zip 会将三个发布者异步组合在一起,我认为这是最好的解决方案。

否则这是一个非常简单的问题:

Mono<List<String>> m1 = Mono.just(Arrays.asList(new String[]{"A", "B", "C"}));
Mono<List<Character>> m2 = Mono.just(Arrays.asList(new Character[]{'a', 'b', 'c'}));
Mono<List<Integer>> m3 = Mono.just(Arrays.asList(new Integer[]{1, 2, 3}));
Mono.zip(m1, m2, m3)
        .map(tuple3->{
                List<Combined> c = new ArrayList<>();
                int size = tuple3.getT1().size();
                for ( int i=0; i < size; ++i ) {
                    c.add(new Combined(tuple3.getT1().get(i), tuple3.getT2().get(i), tuple3.getT3().get(i)));
                }
                return c;
        })
        .subscribe(System.out::println);
// [Combined(s=A, c=a, i=1), Combined(s=B, c=b, i=2), Combined(s=C, c=c, i=3)]

为了完整起见:

@Data
@AllArgsConstructor
class Combined {
    String s;
    Character c;
    Integer i;
}

【讨论】:

  • 感谢您的解释和示例!我现在明白Mono::zip 会将它们组合在一起,但该示例并没有让我清楚地知道如何将现有的Mono&lt;List&lt;GeneralType&gt;&gt; 组合成一个。
猜你喜欢
  • 1970-01-01
  • 2014-04-26
  • 1970-01-01
  • 2019-05-25
  • 1970-01-01
  • 2013-12-20
  • 2020-05-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多