【问题标题】:Reduce stream of Flux to Flux将通量减少到通量
【发布时间】:2018-12-22 13:41:11
【问题描述】:
我在Stream<Flux<T>> 上申请减少操作时遇到一些问题,我想将其减少到Flux<T>。每个 AdProvider 都以 Flux 的形式提供报价,我想使用流从每个 AdProvider 获取所有报价并将它们连接到一个管道。我怎么可能用 reduce 做到这一点?
Set<AdProvider> adProviders;
@Override
@LogBefore
public void gather()
{
adProviders
.parallelStream()
.map(this::gatherOffers)
.reduce(?)
.subscribe();
}
private Flux<Ad> gatherOffers(AdProvider adProvider)
{
try
{
return adProvider.offers();
}
catch(Exception e)
{
log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);
return Flux.empty();
}
}
【问题讨论】:
标签:
spring-boot
stream
reactive-programming
project-reactor
【解决方案1】:
使用Flux#fromStream() + Flux#flatMap() 展平Stream<Flux>
为了解决这个问题,您可以结合Flux#fromStream()(将Stream<Flux> 转换为Flux<Flux>)和Flux#flatMap()(将内部磁通扁平化为扁平Flux),如下例所示:
Set<AdProvider> adProviders;
@Override
public void gather()
{
Flux.fromStream(adProviders.stream())
.parallel() // replace .parallelStream with separate parallel + runOn
.runOn(Schedulers.parallel())
.flatMap(this::gatherOffers)
.subscribe();
}
private Flux<Ad> gatherOffers(AdProvider adProvider)
{
try
{
return adProvider.offers();
}
catch(Exception e)
{
log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);
return Flux.empty();
}
}
可能会注意到,我用普通的.stream 和parallel + runOn 替换了parallelStream,它们的作用几乎相同。
或者,您可以完全避免使用流,而只依赖Flux.fromIterble + 相同的Flux#flatMap:
Set<AdProvider> adProviders;
@Override
public void gather()
{
Flux.fromIterable(adProviders)
.parallel() // replace .parallelStream with separate parallel + runOn
.runOn(Schedulers.parallel())
.flatMap(this::gatherOffers)
.subscribe();
}
private Flux<Ad> gatherOffers(AdProvider adProvider)
{
try
{
return adProvider.offers();
}
catch(Exception e)
{
log.warn(EXCEPTION_WHILE_PROCESSING_OFFERS, adProvider.getClass().getSimpleName(), e);
return Flux.empty();
}
}