【问题标题】:Spring WebFlux Flux behavior with non streaming application/jsonSpring WebFlux Flux 行为与非流式应用程序/json
【发布时间】:2019-07-07 20:46:53
【问题描述】:

我正在评估使用 Spring Webflux,但我们必须支持期望 application/json,而不是 application/stream+json 的客户端。我不清楚 Spring WebFlux 在客户端需要 application/json 的情况下如何处理序列化 Flux。

如果 Flux 被序列化为 application/json 而不是 application/stream+json 是阻塞操作吗?

下面我整理了一个示例控制器来演示我所看到的。当流是无限的并产生 application/json 时,浏览器不会返回任何内容。这似乎是合理的,因为它可能正在等待流终止。当流是无限的并产生 application/stream+json 时,我会按预期在浏览器中连续看到 JSON 对象。当 Flux 是有限的,比如 100 个元素,并且类型是 application/json 时,它会立即按预期呈现。 问题是,在序列化之前是否必须等待 Flux 终止,这是否会导致阻塞操作。返回正常的应用程序/json 时,使用 Flux 对性能和可扩展性有何影响?

@RestController
public class ReactiveController {

    /* Note: In the browser this sits forever and never renders */
    @GetMapping(path = "/nonStreaming", produces = MediaType.APPLICATION_JSON_VALUE)
    public Flux<Person> getPeopleNonStreaming() {
        return Flux.interval(Duration.ofMillis(100))
                .map(tick -> new Person("Dude", "Dude", tick));
    }

    /* Note: This renders in the browser in chunks forever */
    @GetMapping(path = "/streaming", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Person> getPeopleStreaming() {
        return Flux.interval(Duration.ofMillis(100))
                .map(tick -> new Person("Dude", "Dude", tick));
    }

    /* Note: This returns, but I can't tell if it is done in a non blocking manner. It
     * appears to gather everything before serializing. */
    @GetMapping(path = "/finiteFlux", produces = MediaType.APPLICATION_JSON_VALUE)
    public Flux<Person> finiteFlux() {
        return Flux.range(0, 100)
                .map(tick -> new Person("Dude", "Dude", tick));
    }
}

更新:

我在下面添加了额外的日志信息:

流似乎使用了两个不同的线程

2019-02-13 16:53:07.363 DEBUG 3416 --- [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter    : [dac80fd4] HTTP GET "/streaming"
2019-02-13 16:53:07.384 DEBUG 3416 --- [ctor-http-nio-2] s.w.r.r.m.a.RequestMappingHandlerMapping : [dac80fd4] Mapped to public reactor.core.publisher.Flux<io.jkratz.reactivedemo.Person> io.jkratz.reactivedemo.ReactiveController.getPeopleStreaming()
2019-02-13 16:53:07.398 DEBUG 3416 --- [ctor-http-nio-2] o.s.w.r.r.m.a.ResponseBodyResultHandler  : Using 'application/stream+json;q=0.8' given [text/html, application/xhtml+xml, image/webp, image/apng, application/xml;q=0.9, */*;q=0.8] and supported [application/stream+json]
2019-02-13 16:53:07.398 DEBUG 3416 --- [ctor-http-nio-2] o.s.w.r.r.m.a.ResponseBodyResultHandler  : [dac80fd4] 0..N [io.jkratz.reactivedemo.Person]
2019-02-13 16:53:07.532 DEBUG 3416 --- [     parallel-1] o.s.http.codec.json.Jackson2JsonEncoder  : [dac80fd4] Encoding [io.jkratz.reactivedemo.Person@6b3e843d]
2019-02-13 16:53:07.566 DEBUG 3416 --- [ctor-http-nio-2] r.n.channel.ChannelOperationsHandler     : [id: 0xdac80fd4, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52398] Writing object DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/stream+json;q=0.8;charset=UTF-8
2019-02-13 16:53:07.591 DEBUG 3416 --- [ctor-http-nio-2] r.n.channel.ChannelOperationsHandler     : [id: 0xdac80fd4, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52398] Writing object 
2019-02-13 16:53:07.629 DEBUG 3416 --- [     parallel-1] o.s.http.codec.json.Jackson2JsonEncoder  : [dac80fd4] Encoding [io.jkratz.reactivedemo.Person@217d62db]
2019-02-13 16:53:07.630 DEBUG 3416 --- [ctor-http-nio-2] r.n.channel.ChannelOperationsHandler     : [id: 0xdac80fd4, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52398] Writing object 
2019-02-13 16:53:07.732 DEBUG 3416 --- [     parallel-1] o.s.http.codec.json.Jackson2JsonEncoder  : [dac80fd4] Encoding [io.jkratz.reactivedemo.Person@741c0c88]
2019-02-13 16:53:07.732 DEBUG 3416 --- [ctor-http-nio-2] r.n.channel.ChannelOperationsHandler     : [id: 0xdac80fd4, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52398] Writing object 
2019-02-13 16:53:07.832 DEBUG 3416 --- [     parallel-1] o.s.http.codec.json.Jackson2JsonEncoder  : [dac80fd4] Encoding [io.jkratz.reactivedemo.Person@7b8532e5]

虽然有限的 JSON 只使用一个线程。

2019-02-13 16:55:34.431 DEBUG 3416 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [5b048f46] HTTP GET "/finiteFlux"
2019-02-13 16:55:34.432 DEBUG 3416 --- [ctor-http-nio-3] s.w.r.r.m.a.RequestMappingHandlerMapping : [5b048f46] Mapped to public reactor.core.publisher.Flux<io.jkratz.reactivedemo.Person> io.jkratz.reactivedemo.ReactiveController.finiteFlux()
2019-02-13 16:55:34.434 DEBUG 3416 --- [ctor-http-nio-3] o.s.w.r.r.m.a.ResponseBodyResultHandler  : Using 'application/json;q=0.8' given [text/html, application/xhtml+xml, image/webp, image/apng, application/xml;q=0.9, */*;q=0.8] and supported [application/json]
2019-02-13 16:55:34.435 DEBUG 3416 --- [ctor-http-nio-3] o.s.w.r.r.m.a.ResponseBodyResultHandler  : [5b048f46] 0..N [io.jkratz.reactivedemo.Person]
2019-02-13 16:55:34.439 DEBUG 3416 --- [ctor-http-nio-3] o.s.http.codec.json.Jackson2JsonEncoder  : [5b048f46] Encoding [[io.jkratz.reactivedemo.Person@425c8296, io.jkratz.reactivedemo.Person@22ae73df, io.jkratz.reactived (truncated)...]
2019-02-13 16:55:34.448 DEBUG 3416 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Writing object DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/json;q=0.8;charset=UTF-8
2019-02-13 16:55:34.448 DEBUG 3416 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Writing object 
2019-02-13 16:55:34.450 DEBUG 3416 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [5b048f46] Completed 200 OK
2019-02-13 16:55:34.450 DEBUG 3416 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Last HTTP response frame
2019-02-13 16:55:34.450 DEBUG 3416 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Writing object EmptyLastHttpContent
2019-02-13 16:55:34.450 DEBUG 3416 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Decreasing pending responses, now 0
2019-02-13 16:55:34.451 DEBUG 3416 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] No ChannelOperation attached. Dropping: EmptyLastHttpContent

【问题讨论】:

    标签: java spring spring-webflux reactor


    【解决方案1】:

    在流式 mimetype (application/stream+json) 的情况下,Spring WebFlux 中默认配置的 JSON 编解码器将序列化为 JSON,并在网络上刷新Flux 输入的每个元素。当流是无限的,或者当您想在信息可用时立即将信息推送到客户端时,此行为很方便。请注意,这有性能成本,因为调用序列化程序并多次刷新会占用资源。

    在非流式(application/json)的情况下,Spring WebFlux中默认配置的JSON编解码器将序列化为JSON并一次性刷入网络。它将在内存中缓冲Flux&lt;YourObject&gt; 并一次将其序列化。 这并不意味着操作是阻塞的,因为生成的Flux&lt;Databuffer&gt; 是以反应方式写入网络的。这里没有任何阻碍。

    这只是“流式传输数据和使用更多资源”与“更有效地缓冲和使用资源”之间的权衡。

    在流式传输的情况下,事情更有可能由不同的工作线程处理,因为工作项在不同的时间间隔可用。在简单 JSON 响应的情况下 - 它也可能由一个或多个线程处理:它取决于有效负载大小,远程客户端是否缓慢。

    【讨论】:

    • 旁注:APPLICATION_STREAM_JSON_VALUE 从现在开始就是@Deprecated
    • @membersound 但APPLICATION_NDJSON_VALUE 不是并且行为相同
    【解决方案2】:

    似乎所有的魔法都发生在 AbstractJackson2Encoder#encode 方法中。 这是常规application/json序列化的代码:

    // non-streaming
    
    return Flux.from(inputStream)
                .collectList() // make Mono<List<YourClass>> from Flux<YourClass>
                .map(list -> encodeValue(list, bufferFactory, listType, mimeType, hints)) // serialize list to JSON and write to DataBuffer
                .flux(); // get Flux<DataBuffer> from Mono<DataBuffer>
    
    

    所以,是的,它在序列化之前等待 Flux 终止。

    性能改进值得怀疑,因为它总是必须等待所有数据序列化。所以在 application/json 媒体类型的情况下,Flux 和常规 List 之间没有太大区别

    【讨论】:

      猜你喜欢
      • 2021-06-22
      • 1970-01-01
      • 2019-01-11
      • 2018-11-22
      • 2020-12-23
      • 2018-12-12
      • 1970-01-01
      • 2021-02-06
      • 2019-03-26
      相关资源
      最近更新 更多