【问题标题】:Combine multiple mono in Spring Webflux在 Spring Webflux 中组合多个单声道
【发布时间】:2020-08-12 16:09:27
【问题描述】:

我是 webflux 的新手,我正在尝试使用 Flux 执行多个单声道。但我认为我做错了.. 这是执行多个Mono 并将其收集到列表的最佳方法吗?

这是我的代码:

    mainService.getAllBranch()
            .flatMapMany(branchesList -> {
                List<Branch> branchesList2 = (List<Branch>) branchesList.getData();
                List<Mono<Transaction>> trxMonoList= new ArrayList<>();

                branchesList2.stream().forEach(branch -> {
                    trxMonoList.add(mainService.getAllTrxByBranchId(branch.branchId));
                });
                return Flux.concat(trxMonoList); // <--- is there any other way than using concat?
            })
            .collectList()
            .flatMap(resultList -> combineAllList());
    interface MainService{
            Mono<RespBody> getAllBranch();
            Mono<RespBody> getAllTrxByBranchId(String branchId); //will return executed url ex: http://trx.com/{branchId}
    }

到目前为止,我可以这样解释:

  1. 获取所有分支
  2. 遍历所有branchesList2并将其添加到trxMonoList
  3. 返回Flux.concat,这是我不确定这是否正确的地方。但它正在工作
  4. 合并所有列表

我只是困惑这是在我的上下文中使用Flux 的正确方法吗?还是有更好的方法来实现我正在尝试做的事情?

【问题讨论】:

    标签: java spring-boot spring-webflux project-reactor


    【解决方案1】:

    您需要将您的代码稍微重构为响应式。

     mainService.getAllBranch()
            .flatMapMany(branchesList -> Flux.fromIterable(branchesList.getData())) (1)
            .flatMap(branch -> mainService.getAllTrxByBranchId(branch.branchId))    (2)
            .collectList()
            .flatMap(resultList -> combineAllList());
    

    1) 从 List 中创建 Flux 个分支;

    2) 遍历每个元素并调用服务。

    您不应该在 Reactor 中使用 Stream API,因为它具有相同的方法,但针对多线程进行了适配和优化。

    【讨论】:

    • 您指出的 (2) 似乎不像您所说的那样工作。它不会遍历每个元素并调用服务。它只执行一次。
    • 能否请您将 log() 运算符添加到运算符的链中?
    • 我刚刚又检查了一遍,结果发现当我添加mainService.getAllTrxByBranchId(branch.branchId) .zipWith(cookie) 时,我只得到 1 个列表。那么我怎样才能将我的 cookie 与 branchList 一起传递呢?我试过concateWith,但我不知道该怎么做。
    • 好的,现在解决了。来自flux documentation about zipWith “也就是说,等待两者都发出一个元素并将这些元素组合成一个 Tuple2。” 这就是为什么我只收到列表的第一个元素.
    【解决方案2】:

    这里真正的问题是您不应该在Flux 中多次点击Mono。那会给你带来麻烦。如果你正在设计 API,你应该修复它,以正确的反应方式做你想做的事。

    interface MainService{
            Flux<Branch> getAllBranch();
            Flux<Transaction> getAllTrxByBranchId(Flux<String> branchIds);
    }
    

    然后你的代码变得更简单,响应式框架将正常工作。

    mainService.getAllTrxByBranchId(mainService.getAllBranch().map(Branch::getId));
    

    【讨论】:

      猜你喜欢
      • 2018-11-17
      • 2020-10-22
      • 2019-10-06
      • 2018-12-05
      • 2023-03-07
      • 2019-08-09
      • 1970-01-01
      • 1970-01-01
      • 2020-01-22
      相关资源
      最近更新 更多