【问题标题】:WebSocketClient frozen when connecting to WebSocket (Spring WebFlux)连接到 WebSocket 时 WebSocketClient 冻结(Spring WebFlux)
【发布时间】:2020-11-05 23:43:06
【问题描述】:

我已经编写了一个示例来演示使用 WebSocket 协议的客户端/服务器通信。

服务器代码:

@SpringBootApplication
class WebSocketServerApplication {

    @Bean
    fun webSocketMapping(mapper: ObjectMapper): HandlerMapping? {
        val map = mapOf("/ws/messages" to ChatSocketHandler(mapper))
        val simpleUrlHandlerMapping = SimpleUrlHandlerMapping().apply {
            urlMap = map
            order = 10
        }
        return simpleUrlHandlerMapping
    }

    @Bean
    fun handlerAdapter(): WebSocketHandlerAdapter = WebSocketHandlerAdapter()
}

fun main(args: Array<String>) {
    runApplication<WebSocketServerApplication>(*args)
}

class ChatSocketHandler(val mapper: ObjectMapper) : WebSocketHandler {
    val sink = Sinks.replay<Message>(100);
    val outputMessages: Flux<Message> = sink.asFlux();

    override fun handle(session: WebSocketSession): Mono<Void> {
        println("handling WebSocketSession...")
        session.receive()
                .map { it.payloadAsText }
                .map { Message(id= UUID.randomUUID().toString(), body = it, sentAt = Instant.now()) }
                .doOnNext { println(it) }
                .subscribe(
                        { message: Message -> sink.next(message) },
                        { error: Throwable -> sink.error(error) }
                );

        return session.send(
                Mono.delay(Duration.ofMillis(100))
                        .thenMany(outputMessages.map { session.textMessage(toJson(it)) })

        )

    }

    fun toJson(message: Message): String = mapper.writeValueAsString(message)

}

data class Message @JsonCreator constructor(
        @JsonProperty("id") var id: String? = null,
        @JsonProperty("body") var body: String,
        @JsonProperty("sentAt") var sentAt: Instant = Instant.now()
)

我提供了一个用Angular写的客户端,效果很好,代码是here

尝试测试服务器时。

@SpringBootTest()
class WebsocketServerApplicationTests {

    lateinit var client: WebSocketClient;

    @Autowired
    lateinit var mapper: ObjectMapper;

    @BeforeEach
    fun setup() {
        this.client = ReactorNettyWebSocketClient()
    }

    @Test
    fun contextLoads() {
        val replay = Sinks.replay<Message>(10)

        client.execute(
                URI("ws://localhost:8080/ws/messages")
        ) { session: WebSocketSession ->
            println("Starting to send messages")
            session.receive()
                    .map { mapper.readValue(it.payloadAsText, Message::class.java) }
                    .subscribe { replay.next(it) }

            session.send(
                    Mono.delay(Duration.ofSeconds(1)).thenMany(
                            Flux.just("test message", "test message2")
                                    .map(session::textMessage)
                    )
            ).then()
        }.subscribe()

        StepVerifier.create(replay.asFlux().takeLast(2))
                .consumeNextWith { it -> assertThat(it.body).isEqualTo("test message") }
                .consumeNextWith { it -> assertThat(it.body).isEqualTo("test message2") }
                .verifyComplete()
    }
}

启动应用程序时,运行测试,它被冻结,无法按预期工作。

【问题讨论】:

    标签: spring spring-boot spring-webflux project-reactor spring-websocket


    【解决方案1】:

    问题出在测试端。

    完成您的直播以使 takeLast(n) 正常工作

    首先,您希望从流中获取最后 2 个元素。但是,当且仅当存在onComplete 信号时才会发生这种情况,这会让Flux.takeLast 知道流的结尾,因此最后观察到的n 元素是last.

    在您的代码中,您收听WebsocketInbound 消息并将它们发送到ReplaySink。然而,FluxSink#complete 消息永远不会被调用,这意味着takeLast(2) 将像预期的那样永远挂起。

    解决方案

    一方面,解决方案似乎很明显:

    session.receive()
           .map { mapper.readValue(it.payloadAsText, Message::class.java) }
           .subscribe ({ replay.next(it) }, { replay.error(it) }, { replay.complete() })
    

    但是,可能有一个技巧:
    .receive 仅在 WebSocket 连接关闭时发送终端信号。

    因此,为了接收终端信号,请确保服务器关闭其一侧的连接。否则,测试仍然会挂起,等待最终的终端信号。

    如果不需要关闭连接,请尝试简单地使用.take(2)

    【讨论】:

      【解决方案2】:

      在阅读了一些关于 stackoverflow 的帖子和在 spring 框架中测试响应式 WebSocket 的源代码后,我自己终于解决了这个问题。

      @SpringBootTest()
      class WebSocketServerApplicationTests {
      
          lateinit var client: WebSocketClient
      
          @Autowired
          lateinit var mapper: ObjectMapper
      
          @BeforeEach
          fun setup() {
              this.client = ReactorNettyWebSocketClient()
          }
      
         @Test
          fun contextLoads() {
              val replay = Processors.replay<Message>(100)
              try {
                  client.execute(
                          URI("ws://localhost:8080/ws/messages")
                  ) { session: WebSocketSession ->
      
                      val receiveMono = session.receive()
                              .map { mapper.readValue(it.payloadAsText, Message::class.java) }
                              .log("received from server::")
                              .subscribeWith(replay)
                              .then()
                      session
                              .send(
                                      Mono.delay(Duration.ofMillis(500)).thenMany(
                                              Flux.just("test message", "test message2")
                                                      .map(session::textMessage)
                                      )
                              )
                              .then(receiveMono)
                  }.block(Duration.ofSeconds(5L))
      
                  // assert
                  assertThat(replay.blockLast(Duration.ofSeconds(5L))?.body).isEqualTo("test message2")
              } catch (e: Exception) {
                  println(e.message)
              }
          }
      }
      

      【讨论】:

        猜你喜欢
        • 2022-08-04
        • 2020-03-12
        • 2018-05-07
        • 2020-03-15
        • 2015-09-09
        • 2022-08-06
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多