【发布时间】:2018-08-18 14:12:24
【问题描述】:
为了允许对来自 CompletableFuture<Stream<String>> 的结果流进行多次迭代,我正在考虑以下方法之一:
通过
teams.thenApply(st -> st.collect(toList()))将生成的future转换为CompletableFuture<List<String>>将生成的 future 转换为
Flux<String>并使用缓存:Flux.fromStream(teams::join).cache();
Flux<T>是Publisher<T>在项目reactor中的实现。
用例:
我想从提供League 对象和Standing[] 的数据源中获取包含英超球队名称(例如Stream<String>)的序列(基于足球数据RESTful API,例如@987654321 @)。使用AsyncHttpClient 和Gson 我们有:
CompletableFuture<Stream<String>> teams = asyncHttpClient
.prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
.execute()
.toCompletableFuture()
.thenApply(Response::getResponseBody)
.thenApply(body -> gson.fromJson(body, League.class));
.thenApply(l -> stream(l.standings).map(s -> s.teamName));
要重用生成的流,我有两个选择:
1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))
2. Flux<String> res = Flux.fromStream(teams::join).cache()
Flux<T> 不那么冗长,并提供了我需要的一切。然而,在这种情况下使用它是否正确?
或者我应该改用CompletableFuture<List<String>> 吗?或者还有其他更好的选择吗?
更新了一些想法(2018-03-16):
CompletableFuture<List<String>>:
- [PROS]
List<String>将继续收集,当我们需要继续处理未来的结果时,它可能已经完成。 - [CONS] 声明冗长。
- [缺点] 如果我们只想使用一次,那么我们不需要在
List<T>中收集这些项目。
Flux<String>:
- [PROS] 声明简洁
- [PROS] 如果我们只想使用一次,那么我们可以省略
.cache()并将其转发到下一层,这可以利用响应式 API,例如网络通量反应控制器,例如@GetMapping(produces =MediaType.TEXT_EVENT_STREAM) public Flux<String> getTeams() {…} - [缺点] 如果我们想重用
Flux<T>,我们必须将其包装在可缓存的Flux<T>(….cache()) 中,这反过来会增加第一次遍历的开销,因为它必须存储结果项目在内部缓存中。
【问题讨论】:
-
“这反过来会增加第一次遍历的开销” - 可以忽略不计,忽略这个。
-
Flux是一个异步响应式管道。List是List。您需要什么?您正在将苹果与橙子进行比较。 -
@BoristheSpider 我没有将
List与Flux进行比较。我将CF<List>与Flux进行比较。 -
那是
Mono<List<T>>不是Flux<T>。应该很明显两者是不同的。 -
Mono<List<T>>与CF<List<T>>相同。从CF<List<T>>转换为Mono<List<T>>没有优势。
标签: java java-8 rx-java project-reactor completable-future