【问题标题】:Stream from WebClient into Flux. Blocking timeout throws exception从 WebClient 流式传输到 Flux。阻塞超时抛出异常
【发布时间】:2020-10-01 17:27:55
【问题描述】:

我在 Spring Boot 应用程序中使用 WebClient 调用流式 API。

我想检索元素,直到我收到 10 个元素,或者 10 秒过去了。我希望请求被阻止,直到其中任何一个先发生。

        WebClient client = WebClient.builder().baseUrl(URL).build();

        List<Item> items = client
                .get()
                .retrieve()
                .bodyToFlux(Item.class)
                .limitRequest(10)
                .collectList()
                .block(Duration.ofSeconds(10));

如果在超时之前检索到 10 个项目,调用会很好地返回,并且我有一个包含 10 个项目的填充列表。

但是,如果先超时,则会引发以下异常,并且不会返回任何项目。

java.lang.IllegalStateException: Timeout on blocking read for 10000 MILLISECONDS

如何读取最长 x 秒的流,然后使用 WebClient 返回检索到的项目?

【问题讨论】:

    标签: spring-boot reactive-programming spring-webflux project-reactor spring-reactive


    【解决方案1】:

    我想检索元素,直到我收到 10 个元素,或者 10 秒过去了。

    听起来像 bufferTimeout() 正是你所追求的。

    将传入的值收集到多个 List 缓冲区中,每次缓冲区达到最大大小或 maxTime Duration 过去时,返回的 Flux 将发出这些缓冲区。

    您只需要这些缓冲区之一。在响应式上下文中,您只需在生成的通量上调用 next() - 因为您只想阻止,您可以调用 blockFirst()

    类似:

    List<Item> items = client
            .get()
            .retrieve()
            .bodyToFlux(Item.class)
            .bufferTimeout(10, Duration.ofSeconds(10)) //first parameter is max number of elements, second is timeout
            .blockFirst();
    

    【讨论】:

    • .next().block() 可以替换为blockFirst()
    • 太棒了。 bufferTimeout() 确实是我一直在寻找的,再加上 blockFirst()
    猜你喜欢
    • 2017-11-16
    • 2018-05-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-01-20
    • 2012-12-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多