我应该如何使用这个 API,可能会出现什么问题?
我正在使用的以及我在文档某处的一个示例中看到的是在 ReceiveChannel 上调用的 consumeAsFlow() 方法。这是整个sn-p:
webSocket("/websocket") { //this: DefaultWebSocketServerSession
incoming
.consumeAsFlow()
.map { receive(it) }
.collect()
}
尚未发现这种方法存在重大问题。您应该注意的一件事(但也适用于非流方法)是,如果您将其放入流中,那么它将破坏 WebSocket 连接,这通常不是您想要做的事情。可能值得考虑将整个内容包装在 try-catch 中。
它会在第二次付费呼叫时重新打开,还是可能在下一次付费后发送一次旧消息?
您甚至在开始使用流中的消息之前就打开了 websocket。您可以看到在webSocket() {} 内部,您处于DefaultWebSocketServerSession 的上下文中。这是您的连接管理。在您的流程中,您只是在消息到达时一一接收消息(在建立连接之后)。如果连接中断,那么你就没有流量了。需要重新建立它才能处理您的消息。该建立位由Route.webSocket() 方法完成。我建议您看一下它的 Javadoc。
如果您希望在连接关闭后进行一些清理,您可以像这样添加finally 块:
webSocket("/chat") {
try {
incoming
.consumeAsFlow()
.map { receive(it, client) }
.collect()
} finally {
// cleanup
}
}
简而言之:collect 每收到一条消息就会被调用一次。如果没有连接(或已断开),则不会调用 collect。
如何收集 N 条消息,然后停止收集,然后再次收集?
这有什么用例?我认为您不应该在任何流程中执行此操作。您当然可以从流中take(n) 项,但您将无法再从流中获取更多内容。