【发布时间】:2017-12-13 22:12:07
【问题描述】:
我们正在使用 spring boot 2.0.0.BUILD_SNAPSHOT 和 spring boot webflux 5.0.0,目前我们无法根据请求将通量传输给客户端。
目前我正在从迭代器创建通量:
public Flux<ItemIgnite> getAllFlux() {
Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();
return Flux.create(flux -> {
while(iterator.hasNext()) {
flux.next(iterator.next().getValue());
}
});
}
根据要求,我只是在做:
@RequestMapping(value="/all", method=RequestMethod.GET, produces="application/json")
public Flux<ItemIgnite> getAllFlux() {
return this.provider.getAllFlux();
}
当我现在在 10 秒后本地调用 localhost:8080/all 时,我得到一个 503 状态代码。当我使用WebClient 请求/all 时,也与客户端一样:
public Flux<ItemIgnite> getAllPoducts(){
WebClient webClient = WebClient.create("http://localhost:8080");
Flux<ItemIgnite> f = webClient.get().uri("/all").accept(MediaType.ALL).exchange().flatMapMany(cr -> cr.bodyToFlux(ItemIgnite.class));
f.subscribe(System.out::println);
return f;
}
什么都没有发生。没有数据传输。
当我改为执行以下操作时:
public Flux<List<ItemIgnite>> getAllFluxMono() {
return Flux.just(this.getAllList());
}
和
@RequestMapping(value="/allMono", method=RequestMethod.GET, produces="application/json")
public Flux<List<ItemIgnite>> getAllFluxMono() {
return this.provider.getAllFluxMono();
}
它正在工作。我猜是因为所有数据都已经完成加载并刚刚传输到客户端,因为它通常会在不使用通量的情况下传输数据。
我必须进行哪些更改才能让 Flux 将数据流式传输到请求这些数据的 Web 客户端?
编辑
我在ignite cache 中有数据。所以我的getAllIterator 正在从 ignite 缓存中加载数据:
public Iterator<Cache.Entry<String, ItemIgnite>> getAllIterator() {
return this.igniteCache.iterator();
}
编辑
像@Simon Baslé 建议的那样添加flux.complete():
public Flux<ItemIgnite> getAllFlux() {
Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();
return Flux.create(flux -> {
while(iterator.hasNext()) {
flux.next(iterator.next().getValue());
}
flux.complete(); // see here
});
}
解决了浏览器中的503 问题。但它并没有解决WebClient 的问题。仍然没有传输数据。
编辑 3
使用publishOn 和Schedulers.parallel():
public Flux<ItemIgnite> getAllFlux() {
Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();
return Flux.<ItemIgnite>create(flux -> {
while(iterator.hasNext()) {
flux.next(iterator.next().getValue());
}
flux.complete();
}).publishOn(Schedulers.parallel());
}
不改变结果。
在这里,我将 WebClient 收到的内容发布给您:
value :[Item ID: null, Product Name: null, Product Group: null]
complete
因此,他似乎获得了一项(超过 35.000 项)并且值为 null,他正在完成之后。
【问题讨论】:
-
您能解释一下
getAllIterator在做什么吗?是阻塞吗?从数据库中读取数据?凭记忆?
标签: spring project-reactor spring-webflux