【发布时间】:2016-05-03 18:47:20
【问题描述】:
Akka Http 中的 Web Socket 连接被视为 Akka Streams Flow。这似乎对基本的请求-回复很有用,但是当消息也应该通过 websocket 推送时,它会变得更加复杂。我的服务器的核心看起来有点像:
lazy val authSuccessMessage = Source.fromFuture(someApiCall)
lazy val messageFlow = requestResponseFlow
.merge(updateBroadcastEventSource)
lazy val handler = codec
.atop(authGate(authSuccessMessage))
.join(messageFlow)
handleWebSocketMessages {
handler
}
这里,codec 是一个(反)序列化BidiFlow,authGate 是一个处理授权消息并防止任何消息流出直到授权成功的BidiFlow。成功后,它会发送authSuccessMessage 作为回复。 requestResponseFlow 是标准的请求-回复模式,updateBroadcastEventSource 混合了异步推送消息。
我希望能够在某些情况下发送错误消息并正常终止连接,例如授权错误、someApiCall 失败或requestResponseFlow 处理的错误请求。所以基本上,基本上看起来我希望能够用最后一条消息异步完成messageFlow,即使它的其他组成流仍然存在。
【问题讨论】:
标签: scala websocket akka-stream akka-http