【问题标题】:Flux subscribed through SSE raise a cancel() event通过 SSE 订阅的 Flux 引发 cancel() 事件
【发布时间】:2018-07-24 01:52:25
【问题描述】:

我有一个 Spring Boot 2.0.0.M7 + Spring Webflux 应用程序,我在其中使用 Thymeleaf Reactive。

我注意到,在我的微服务上,当我调用端点以 SSE 模式(文本/事件流)返回数据通量时,即使已正确处理,也会在该通量上发生 cancel()。

例如,这是一个简单的控制器端点:

@GetMapping(value = "/posts")
public Flux<String> getCommunityPosts() {
    return Flux.just("A", "B", "C").log("POSTS");
}

这是我在 SSE 模式下请求时获得的订阅通量日志:

2018-02-13 17:04:09.841  INFO 4281 --- [nio-9090-exec-4] POSTS : | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
2018-02-13 17:04:09.841  INFO 4281 --- [nio-9090-exec-4] POSTS : | request(1)
2018-02-13 17:04:09.842  INFO 4281 --- [nio-9090-exec-4] POSTS : | onNext(A)
2018-02-13 17:04:09.847  INFO 4281 --- [nio-9090-exec-4] POSTS : | request(1)
2018-02-13 17:04:09.847  INFO 4281 --- [nio-9090-exec-4] POSTS : | onNext(B)
2018-02-13 17:04:09.848  INFO 4281 --- [nio-9090-exec-4] POSTS : | request(1)
2018-02-13 17:04:09.848  INFO 4281 --- [nio-9090-exec-4] POSTS : | onNext(C)
2018-02-13 17:04:09.849  INFO 4281 --- [nio-9090-exec-4] POSTS : | request(1)
2018-02-13 17:04:09.849  INFO 4281 --- [nio-9090-exec-4] POSTS : | onComplete()
2018-02-13 17:04:09.852  INFO 4281 --- [nio-9090-exec-4] POSTS : | cancel()

我们可以注意到 onComplete 之后的取消事件。当我通过经典 GET 请求调用同一个端点时,我没有这种行为。我怀疑这个取消事件会使客户端事件源(javascript)抛出一个 onError 事件。

这是特定于 SSE 的已知/想要的行为吗?

问题更新

我实际上在我的一些流上使用了 SSE,因为我有时需要我的事件源来获取 JSON 数据,而不是 Thymeleaf 已经处理的 HTML。我应该以其他方式吗?

我的实现基于此示例的最后一个方法:https://github.com/danielfernandez/reactive-matchday/blob/master/src/main/java/com/github/danielfernandez/matchday/web/controller/MatchController.java

但是,我可能错过了在之前的帖子中提供的一些信息。我使用 Tomcat 服务器(带有 M7 的 8.5.23),而不是 Netty 服务器。我强制使用 Tomcat,包括以下 Maven 依赖项:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

在示例项目中使用您的代码,这似乎会导致问题。

当我在 Netty 服务器上运行代码时,我得到了和你一样的结果:

2018-02-14 12:30:48.713  INFO 3060 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2018-02-14 12:30:48.714  INFO 3060 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1 : request(1)
2018-02-14 12:30:49.717  INFO 3060 --- [     parallel-2] reactor.Flux.ConcatMap.1 : onNext(a)
2018-02-14 12:30:49.739  INFO 3060 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1 : request(31)
2018-02-14 12:30:50.731  INFO 3060 --- [     parallel-3] reactor.Flux.ConcatMap.1 : onNext(b)
2018-02-14 12:30:51.733  INFO 3060 --- [     parallel-4] reactor.Flux.ConcatMap.1 : onNext(c)
2018-02-14 12:30:51.735  INFO 3060 --- [     parallel-4] reactor.Flux.ConcatMap.1 : onComplete()

当我在 Tomcat 服务器上运行相同的代码时,我遇到了取消问题:

2018-02-14 12:33:18.294  INFO 3088 --- [nio-8080-exec-3] reactor.Flux.ConcatMap.2 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2018-02-14 12:33:18.295  INFO 3088 --- [nio-8080-exec-3] reactor.Flux.ConcatMap.2 : request(1)
2018-02-14 12:33:19.295  INFO 3088 --- [     parallel-4] reactor.Flux.ConcatMap.2 : onNext(a)
2018-02-14 12:33:19.297  INFO 3088 --- [     parallel-4] reactor.Flux.ConcatMap.2 : request(1)
2018-02-14 12:33:20.302  INFO 3088 --- [     parallel-5] reactor.Flux.ConcatMap.2 : onNext(b)
2018-02-14 12:33:20.302  INFO 3088 --- [     parallel-5] reactor.Flux.ConcatMap.2 : request(1)
2018-02-14 12:33:21.306  INFO 3088 --- [     parallel-6] reactor.Flux.ConcatMap.2 : onNext(c)
2018-02-14 12:33:21.306  INFO 3088 --- [     parallel-6] reactor.Flux.ConcatMap.2 : request(1)
2018-02-14 12:33:21.307  INFO 3088 --- [     parallel-6] reactor.Flux.ConcatMap.2 : onComplete()
2018-02-14 12:33:21.307  INFO 3088 --- [nio-8080-exec-4] reactor.Flux.ConcatMap.2 : cancel()

可能是 Tomcat 问题还是我做错了什么?

【问题讨论】:

  • 您解决了这个问题吗?我有同样的问题(在 Tomcat 上)。

标签: java spring spring-boot project-reactor spring-webflux


【解决方案1】:

首先,我认为您不应该将 SSE 用于有限流。

当我创建一个控制器方法时:

@GetMapping(path = "/test", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<String> test() {
    return Flux.just("a", "b", "c").delayElements(Duration.ofSeconds(1)).log();
}

并从浏览器(Chrome 或 Firefox)请求它:

<script type="text/javascript">
    var testEventSource = new EventSource("/test");
    testEventSource.onmessage = function (e) {
        console.log(e);
    };
</script>

我在服务器上得到以下日志:

| onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
| request(1)
| onNext(a)
| request(31)
| onNext(b)
| onNext(c)
| onComplete()
| onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
| request(1)
| onNext(a)
| request(31)
| onNext(b)
| onNext(c)
| onComplete()

Flux 完成后,服务器立即关闭连接,浏览器自动重新连接。这将一遍又一遍地重播相同的序列。

我在服务器上获得cancel() 事件的唯一方法是在直播期间关闭浏览器选项卡。

【讨论】:

  • 我根据您的代码使用新信息编辑了我的初始帖子,我忘了提及我使用的是 Tomcat 服务器而不是 Netty 服务器。当我的代码在 Tomcat 服务器上运行时,我实际上只重现了最终的 cancel() 事件。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2014-09-12
  • 1970-01-01
  • 1970-01-01
  • 2021-03-26
  • 2022-08-15
  • 2021-12-08
  • 1970-01-01
相关资源
最近更新 更多