【发布时间】:2020-11-05 23:43:06
【问题描述】:
我已经编写了一个示例来演示使用 WebSocket 协议的客户端/服务器通信。
服务器代码:
@SpringBootApplication
class WebSocketServerApplication {
@Bean
fun webSocketMapping(mapper: ObjectMapper): HandlerMapping? {
val map = mapOf("/ws/messages" to ChatSocketHandler(mapper))
val simpleUrlHandlerMapping = SimpleUrlHandlerMapping().apply {
urlMap = map
order = 10
}
return simpleUrlHandlerMapping
}
@Bean
fun handlerAdapter(): WebSocketHandlerAdapter = WebSocketHandlerAdapter()
}
fun main(args: Array<String>) {
runApplication<WebSocketServerApplication>(*args)
}
class ChatSocketHandler(val mapper: ObjectMapper) : WebSocketHandler {
val sink = Sinks.replay<Message>(100);
val outputMessages: Flux<Message> = sink.asFlux();
override fun handle(session: WebSocketSession): Mono<Void> {
println("handling WebSocketSession...")
session.receive()
.map { it.payloadAsText }
.map { Message(id= UUID.randomUUID().toString(), body = it, sentAt = Instant.now()) }
.doOnNext { println(it) }
.subscribe(
{ message: Message -> sink.next(message) },
{ error: Throwable -> sink.error(error) }
);
return session.send(
Mono.delay(Duration.ofMillis(100))
.thenMany(outputMessages.map { session.textMessage(toJson(it)) })
)
}
fun toJson(message: Message): String = mapper.writeValueAsString(message)
}
data class Message @JsonCreator constructor(
@JsonProperty("id") var id: String? = null,
@JsonProperty("body") var body: String,
@JsonProperty("sentAt") var sentAt: Instant = Instant.now()
)
我提供了一个用Angular写的客户端,效果很好,代码是here。
尝试测试服务器时。
@SpringBootTest()
class WebsocketServerApplicationTests {
lateinit var client: WebSocketClient;
@Autowired
lateinit var mapper: ObjectMapper;
@BeforeEach
fun setup() {
this.client = ReactorNettyWebSocketClient()
}
@Test
fun contextLoads() {
val replay = Sinks.replay<Message>(10)
client.execute(
URI("ws://localhost:8080/ws/messages")
) { session: WebSocketSession ->
println("Starting to send messages")
session.receive()
.map { mapper.readValue(it.payloadAsText, Message::class.java) }
.subscribe { replay.next(it) }
session.send(
Mono.delay(Duration.ofSeconds(1)).thenMany(
Flux.just("test message", "test message2")
.map(session::textMessage)
)
).then()
}.subscribe()
StepVerifier.create(replay.asFlux().takeLast(2))
.consumeNextWith { it -> assertThat(it.body).isEqualTo("test message") }
.consumeNextWith { it -> assertThat(it.body).isEqualTo("test message2") }
.verifyComplete()
}
}
启动应用程序时,运行测试,它被冻结,无法按预期工作。
【问题讨论】:
标签: spring spring-boot spring-webflux project-reactor spring-websocket