【问题标题】:Terminate Akka-Http Web Socket connection asynchronously异步终止 Akka-Http Web Socket 连接
【发布时间】:2016-05-03 18:47:20
【问题描述】:

Akka Http 中的 Web Socket 连接被视为 Akka Streams Flow。这似乎对基本的请求-回复很有用,但是当消息也应该通过 websocket 推送时,它会变得更加复杂。我的服务器的核心看起来有点像:

lazy val authSuccessMessage = Source.fromFuture(someApiCall)

lazy val messageFlow = requestResponseFlow
    .merge(updateBroadcastEventSource)

lazy val handler = codec
  .atop(authGate(authSuccessMessage))
  .join(messageFlow)

handleWebSocketMessages {
  handler
}

这里,codec 是一个(反)序列化BidiFlowauthGate 是一个处理授权消息并防止任何消息流出直到授权成功的BidiFlow。成功后,它会发送authSuccessMessage 作为回复。 requestResponseFlow 是标准的请求-回复模式,updateBroadcastEventSource 混合了异步推送消息。

我希望能够在某些情况下发送错误消息并正常终止连接,例如授权错误、someApiCall 失败或requestResponseFlow 处理的错误请求。所以基本上,基本上看起来我希望能够用最后一条消息异步完成messageFlow,即使它的其他组成流仍然存在。

【问题讨论】:

    标签: scala websocket akka-stream akka-http


    【解决方案1】:

    想出了如何使用KillSwitch 来做到这一点。

    更新版本

    旧版本的问题是当由堆栈中更高的BidiFlow 阶段(例如我的authGate)触发时,它似乎不起作用。我不确定具体原因,但是将关闭建模为 BidiFlow 本身,放置在堆栈的更上方,解决了这个问题。

    val shutoffPromise = Promise[Option[OutgoingWebsocketEvent]]()
    
    /**
     * Shutoff valve for the connection. It is triggered when `shutoffPromise`
     * completes, and sends a final optional termination message if that
     * promise resolves with one.
     */
    val shutoffBidi = {
      val terminationMessageSource = Source
        .maybe[OutgoingWebsocketEvent]
        .mapMaterializedValue(_.completeWith(shutoffPromise.future))
    
      val terminationMessageBidi = BidiFlow.fromFlows(
        Flow[IncomingWebsocketEventOrAuthorize],
        Flow[OutgoingWebsocketEvent].merge(terminationMessageSource)
      )
    
      val terminator = BidiFlow
        .fromGraph(KillSwitches.singleBidi[IncomingWebsocketEventOrAuthorize, OutgoingWebsocketEvent])
        .mapMaterializedValue { killSwitch =>
          shutoffPromise.future.foreach { _ => println("Shutting down connection"); killSwitch.shutdown() }
        }
    
      terminationMessageBidi.atop(terminator)
    }
    

    然后我将它应用到codec

    val handler = codec
      .atop(shutoffBidi)
      .atop(authGate(authSuccessMessage))
      .join(messageFlow)
    

    旧版本

    val shutoffPromise = Promise[Option[OutgoingWebsocketEvent]]()
    
    /**
     * Shutoff valve for the flow of outgoing messages. It is triggered when
     * `shutoffPromise` completes, and sends a final optional termination
     * message if that promise resolves with one.
     */
    val shutoffFlow = {
      val terminationMessageSource = Source
        .maybe[OutgoingWebsocketEvent]
        .mapMaterializedValue(_.completeWith(shutoffPromise.future))
    
      Flow
        .fromGraph(KillSwitches.single[OutgoingWebsocketEvent])
        .mapMaterializedValue { killSwitch =>
          shutoffPromise.future.foreach(_ => killSwitch.shutdown())
        }
        .merge(terminationMessageSource)
    }
    

    然后handler 看起来像:

    val handler = codec
      .atop(authGate(authSuccessMessage))
      .join(messageFlow via shutoffFlow)
    

    【讨论】:

    • 如果我可以问,为什么 shutdownPromise 是一个惰性验证?
    • @ViktorKlang 我的整个 Web 套接字处理程序都在 case class 中,由于初始化顺序,我遇到了崩溃,我不想担心。
    • 当然可以,但对于本示例,它不是必需的。 :)
    • 公平点,我或多或少只是剪切和粘贴了对我有用的东西。但无论如何它都是一个非编译片段,所以不妨简化一下。
    猜你喜欢
    • 1970-01-01
    • 2017-02-03
    • 2015-11-07
    • 2017-07-17
    • 1970-01-01
    • 2016-05-24
    • 1970-01-01
    • 1970-01-01
    • 2020-09-13
    相关资源
    最近更新 更多