【问题标题】:How to combine the elements of an arbitrary number of dependent Fluxes?如何组合任意数量的依赖通量的元素?
【发布时间】:2018-12-06 20:06:38
【问题描述】:

在非反应性世界中,以下代码 sn-p 没什么特别的:

interface Enhancer {
    Result enhance(Result result);
}

Result result = Result.empty();
result = fooEnhancer.enhance(result);
result = barEnhancer.enhance(result);
result = bazEnhancer.enhance(result);

共有三种不同的 Enhancer 实现采用 Result 实例,对其进行增强并返回增强的结果。让我们假设增强器调用的顺序很重要。

现在,如果这些方法被返回 Flux<Result> 的响应式变体替换怎么办?因为方法依赖于前面方法的结果,所以这里不能使用combineLatest

一个可能的解决方案是:

Flux.just(Result.empty())
    .switchMap(result -> first(result)
        .switchMap(result -> second(result)
            .switchMap(result -> third(result))))
    .subscribe(result -> doSomethingWith(result));

请注意,switchMap 调用是嵌套的。因为我们只对最终结果感兴趣,所以我们让switchMap 在前面的通量中发出新事件后立即切换到下一个通量。

现在让我们尝试使用动态数量的通量来实现。非反应性(没有助焊剂),这也没什么特别的:

List<Enhancer> enhancers = <ordered list of different Enhancer impls>;
Result result = Result.empty();
for (Enhancer enhancer : enhancers) {
    result = enhancer.enhance(result);
}

但是我如何用三个通量概括上述反应性示例来处理任意数量的通量?

【问题讨论】:

    标签: reactive-programming project-reactor


    【解决方案1】:

    我找到了使用递归的解决方案:

    @FunctionalInterface
    interface FluxProvider {
        Flux<Result> get(Result result);
    }
    
    // recursive method creating the final Flux
    private Flux<Result> cascadingSwitchMap(Result input, List<FluxProvider> fluxProviders, int idx) {
        if (idx < fluxProviders.size()) {
            return fluxProviders.get(idx).get(input).switchMap(result -> cascadingSwitchMap(result, fluxProviders, idx + 1));
        }
        return Flux.just(input);
    }
    
    // code using the recursive method
    List<FluxProvider> fluxProviders = new ArrayList<>();
    fluxProviders.add(fooEnhancer::enhance);
    fluxProviders.add(barEnhancer::enhance);
    fluxProviders.add(bazEnhancer::enhance);
    
    cascadingSwitchMap(Result.empty(), fluxProviders, 0)
            .subscribe(result -> doSomethingWith(result));
    

    但也许有一个更优雅的解决方案使用项目反应器的操作员/功能。有人知道这样的功能吗?事实上,这个要求似乎并不奇怪,是吗?

    【讨论】:

      【解决方案2】:

      switchMap 在这里感觉不合适。如果您在Flux 管道被声明时已经拥有List&lt;Enhancer&gt;,为什么不应用一个接近于命令式风格的逻辑:

      List<Enhancer> enhancers = <ordered list of different Enhancer impls>;
      Mono<Result> resultMono = Mono.just(Result.empty)
      for (Enhancer enhancer : enhancers) {
          resultMono = resultMono.map(enhancer::enhance); //previousValue -> enhancer.enhance(previousValue)
      }
      return resultMono;
      

      这甚至可以在订阅时稍后执行,通过将上面的整个代码包装在 Mono.defer(() -&gt; {...}) 块中来实现增强器的更动态分辨率。

      【讨论】:

      • 使用Monomap 可能没问题。但是增强器返回Flux。因此,在循环内部我不能写resultFlux = resultFlux.map(enhancer::enhance);,而是必须使用flatMapswitchMap。我忘了提到通量应该在不同的线程中订阅,以便能够并行发出它们的事件。当一个通量发出一个事件时,后面的通量应该停止根据第一个输入计算其结果,但使用新输入重新开始。这种行为只能通过通量结合嵌套的switchMap 来实现,我认为。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-01-19
      • 2012-03-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-04-16
      相关资源
      最近更新 更多