【问题标题】:How does ktor websocket flow api works?ktor websocket flow api是如何工作的?
【发布时间】:2021-01-01 11:42:43
【问题描述】:

我正在使用 ktor 通过 websockets 进行服务器端开发。

文档向我们展示了这个使用传入通道的示例:

for (frame in incoming.mapNotNull { it as? Frame.Text }) {
    // some
}

但是mapNotNull 被标记为已弃用,而支持Flow。我应该如何使用这个 API,可能会出现什么问题?例如,Flow 是冷流。这意味着将在每个collect 上调用生产者函数。它如何在 websocket 的上下文中工作。它会在第二次collect 呼叫时重新打开,或者旧消息将在下一次collect 之后传递一次?如何收集N 消息,然后停止收集,然后再次收集?

提前致谢:)

【问题讨论】:

    标签: websocket server flow ktor


    【解决方案1】:

    我应该如何使用这个 API,可能会出现什么问题?

    我正在使用的以及我在文档某处的一个示例中看到的是在 ReceiveChannel 上调用的 consumeAsFlow() 方法。这是整个sn-p:

    webSocket("/websocket") { //this: DefaultWebSocketServerSession
        incoming
            .consumeAsFlow()
            .map { receive(it) }
            .collect()
    }
    

    尚未发现这种方法存在重大问题。您应该注意的一件事(但也适用于非流方法)是,如果您将其放入流中,那么它将破坏 WebSocket 连接,这通常不是您想要做的事情。可能值得考虑将整个内容包装在 try-catch 中。

    它会在第二次付费呼叫时重新打开,还是可能在下一次付费后发送一次旧消息?

    您甚至在开始使用流中的消息之前就打开了 websocket。您可以看到在webSocket() {} 内部,您处于DefaultWebSocketServerSession 的上下文中。这是您的连接管理。在您的流程中,您只是在消息到达时一一接收消息(在建立连接之后)。如果连接中断,那么你就没有流量了。需要重新建立它才能处理您的消息。该建立位由Route.webSocket() 方法完成。我建议您看一下它的 Javadoc。

    如果您希望在连接关闭后进行一些清理,您可以像这样添加finally 块:

    webSocket("/chat") {
        try {
            incoming
                .consumeAsFlow()
                .map { receive(it, client) }
                .collect()
        } finally {
            // cleanup
        }
    }
    

    简而言之:collect 每收到一条消息就会被调用一次。如果没有连接(或已断开),则不会调用 collect

    如何收集 N 条消息,然后停止收集,然后再次收集?

    这有什么用例?我认为您不应该在任何流程中执行此操作。您当然可以从流中take(n) 项,但您将无法再从流中获取更多内容。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-02-26
      • 1970-01-01
      • 2019-03-31
      • 2021-05-20
      • 2014-06-19
      相关资源
      最近更新 更多