【问题标题】:Spring Web-Flux: How to return a Flux to a web client on request?Spring Web-Flux:如何根据请求将 Flux 返回给 Web 客户端?
【发布时间】:2017-12-13 22:12:07
【问题描述】:

我们正在使用 spring boot 2.0.0.BUILD_SNAPSHOT 和 spring boot webflux 5.0.0,目前我们无法根据请求将通量传输给客户端。

目前我正在从迭代器创建通量:

public Flux<ItemIgnite> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue());
        }
    });
}

根据要求,我只是在做:

@RequestMapping(value="/all", method=RequestMethod.GET, produces="application/json")
public Flux<ItemIgnite> getAllFlux() {
    return this.provider.getAllFlux();
}

当我现在在 10 秒后本地调用 localhost:8080/all 时,我得到一个 503 状态代码。当我使用WebClient 请求/all 时,也与客户端一样:

public Flux<ItemIgnite> getAllPoducts(){
    WebClient webClient = WebClient.create("http://localhost:8080");

    Flux<ItemIgnite> f = webClient.get().uri("/all").accept(MediaType.ALL).exchange().flatMapMany(cr -> cr.bodyToFlux(ItemIgnite.class));
    f.subscribe(System.out::println);
    return f;

}

什么都没有发生。没有数据传输。

当我改为执行以下操作时:

public Flux<List<ItemIgnite>> getAllFluxMono() {
    return Flux.just(this.getAllList());
}

@RequestMapping(value="/allMono", method=RequestMethod.GET, produces="application/json")
public Flux<List<ItemIgnite>> getAllFluxMono() {
    return this.provider.getAllFluxMono();
}

它正在工作。我猜是因为所有数据都已经完成加载并刚刚传输到客户端,因为它通常会在不使用通量的情况下传输数据。

我必须进行哪些更改才能让 Flux 将数据流式传输到请求这些数据的 Web 客户端?

编辑

我在ignite cache 中有数据。所以我的getAllIterator 正在从 ignite 缓存中加载数据:

public Iterator<Cache.Entry<String, ItemIgnite>> getAllIterator() {
    return this.igniteCache.iterator();
}

编辑

像@Simon Baslé 建议的那样添加flux.complete()

public Flux<ItemIgnite> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue());
        }

        flux.complete(); // see here
    });
}

解决了浏览器中的503 问题。但它并没有解决WebClient 的问题。仍然没有传输数据。

编辑 3

使用publishOnSchedulers.parallel()

public Flux<ItemIgnite> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.<ItemIgnite>create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue());
        }

        flux.complete();
    }).publishOn(Schedulers.parallel());
}

不改变结果。

在这里,我将 WebClient 收到的内容发布给您:

value :[Item ID: null, Product Name: null, Product Group: null]
complete

因此,他似乎获得了一项(超过 35.000 项)并且值为 null,他正在完成之后。

【问题讨论】:

  • 您能解释一下getAllIterator 在做什么吗?是阻塞吗?从数据库中读取数据?凭记忆?

标签: spring project-reactor spring-webflux


【解决方案1】:

跳出来的一件事是你永远不会在你的create 中调用flux.complete()

但实际上有一个工厂操作员专门用于将Iterable 转换为Flux,所以你可以这样做Flux.fromIterable(this)

编辑:如果您的 Iterator 隐藏了数据库请求(或任何阻塞 I/O)之类的复杂性,请注意这会带来麻烦:反应链中的任何阻塞,如果不是孤立的在使用 publishOn 的专用执行上下文中,不仅有可能阻塞整个链,而且其他响应式进程也很好(因为线程可以并且将被多个响应式进程使用)。

createfromIterable 都没有做任何特别的事情来防止阻塞源。从WebClient 的挂起情况来看,我认为您正面临这种问题。

【讨论】:

  • 添加flux.complete() 可以让我在浏览器中测试时获得预期的结果。我不再收到503。用WebClient试了还是没有结果。
  • 我看到了另外两个问题;您的 getAllIterator 正在阻塞,因此您需要在特定的调度程序上安排它(请参阅publishOn 上的反应器文档)。在 WebClient 端,您在 Flux 上调用 subscribe 并返回它 - 您不应该两者都做。
  • 我编辑了我的帖子,向您展示我是如何添加publishOn 并提供结果到达我的WebClient p。 s。订阅通量只是为了检查是否有输出到达。
【解决方案2】:

问题是我转移的对象ItemIgnite。系统 Flux 似乎无法处理这个问题。因为如果我将原始代码更改为以下内容:

public Flux<String> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue().toString());
        }
    });
}

一切正常。没有publishOn 和没有flux.complete()。也许有人知道为什么这是有效的。

【讨论】:

    猜你喜欢
    • 2018-11-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-12-16
    • 2023-03-19
    • 2018-11-10
    • 2021-05-09
    • 2018-08-14
    相关资源
    最近更新 更多