【问题标题】:Combine Mono with every Flux element emitted将 Mono 与发射的每个 Flux 元素结合起来
【发布时间】:2018-08-17 02:18:56
【问题描述】:

我有一个 Flux 和 Mono 如下:

Mono<MyRequest> req = request.bodyToMono(MyRequest.class);
Mono<List<String>> mono1 = req.map(r -> r.getList());;
Flux<Long> flux1 = req.map(r -> r.getVals()) // getVals() return list of Long
        .flatMapMany(Flux::fromIterable);

现在对于flux1 中的每个数字,我想调用一个方法,其中参数是来自flux1 的id 和来自mono1List&lt;String&gt;。类似的,

flux1.flatMap(id -> process(id, mono1)) 

但是传递和处理相同的mono1 会导致错误Only one connection receive subscriber allowed。我怎样才能达到上述目标?谢谢!

【问题讨论】:

    标签: reactive-programming spring-webflux project-reactor


    【解决方案1】:

    由于这两个信息都来自同一个来源,您可以像这样使用一个管道运行整个事情,并将两个元素包装在 Tuple 或更好,一个更有意义的域对象中:

    Mono<MyRequest> req = // ...
    Flux<Tuple2<Long, List<String>>> tuples = req.flatMapMany(r ->
            Flux.fromIterable(r.getVals())
                    .map(id -> Tuples.of(id, r.getList()))
    );
    // once there, you can map that with your process method like
    tuples.map(tup -> process(tup.getT1(), tup.getT2());
    

    请注意,这看起来很不寻常,这基本上来自您收到的对象的结构。

    【讨论】:

    • 感谢您的 cmets @brian-clozel。我最终做了req .flatMap(r -&gt; Flux.fromIterable(r.getVals()) .flatMap(id -&gt; process(id, r.getList()))你的答案看起来也不错。
    猜你喜欢
    • 2020-05-04
    • 1970-01-01
    • 1970-01-01
    • 2021-08-05
    • 2019-10-09
    • 2014-12-23
    • 2019-09-21
    • 1970-01-01
    • 2020-07-30
    相关资源
    最近更新 更多