【问题标题】:Reduce stream of Flux to Flux将通量减少到通量
【发布时间】:2018-12-22 13:41:11
【问题描述】:

我在Stream<Flux<T>> 上申请减少操作时遇到一些问题,我想将其减少到Flux<T>。每个 AdProvider 都以 Flux 的形式提供报价,我想使用流从每个 AdProvider 获取所有报价并将它们连接到一个管道。我怎么可能用 reduce 做到这一点?

Set<AdProvider> adProviders;

@Override
@LogBefore
public void gather()
{
    adProviders
        .parallelStream()
        .map(this::gatherOffers)
        .reduce(?)
        .subscribe();
}

private Flux<Ad> gatherOffers(AdProvider adProvider)
{
    try
    {
        return adProvider.offers();
    }
    catch(Exception e)
    {
        log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);

        return Flux.empty();
    }
}

【问题讨论】:

    标签: spring-boot stream reactive-programming project-reactor


    【解决方案1】:

    使用Flux#fromStream() + Flux#flatMap() 展平Stream&lt;Flux&gt;

    为了解决这个问题,您可以结合Flux#fromStream()(将Stream&lt;Flux&gt; 转换为Flux&lt;Flux&gt;)和Flux#flatMap()(将内部磁通扁平化为扁平Flux),如下例所示:

    Set<AdProvider> adProviders;
    
    @Override
    public void gather()
    {
        Flux.fromStream(adProviders.stream())
            .parallel() // replace .parallelStream with separate parallel + runOn
            .runOn(Schedulers.parallel())
            .flatMap(this::gatherOffers)
            .subscribe();
    }
    
    private Flux<Ad> gatherOffers(AdProvider adProvider)
    {
        try
        {
            return adProvider.offers();
        }
        catch(Exception e)
        {
            log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);
    
            return Flux.empty();
        }
    }
    

    可能会注意到,我用普通的.streamparallel + runOn 替换了parallelStream,它们的作用几乎相同。

    或者,您可以完全避免使用流,而只依赖Flux.fromIterble + 相同的Flux#flatMap

    Set<AdProvider> adProviders;
    
    @Override
    public void gather()
    {
        Flux.fromIterable(adProviders)
            .parallel() // replace .parallelStream with separate parallel + runOn
            .runOn(Schedulers.parallel())
            .flatMap(this::gatherOffers)
            .subscribe();
    }
    
    private Flux<Ad> gatherOffers(AdProvider adProvider)
    {
        try
        {
            return adProvider.offers();
        }
        catch(Exception e)
        {
            log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);
    
            return Flux.empty();
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2021-03-09
      • 1970-01-01
      • 1970-01-01
      • 2019-10-13
      • 1970-01-01
      • 2019-12-30
      • 1970-01-01
      • 2021-05-15
      • 2015-07-18
      相关资源
      最近更新 更多