【问题标题】:Waiting for a client websocket flow to connect before connecting source and sink在连接源和接收器之前等待客户端 websocket 流连接
【发布时间】:2018-10-15 17:09:16
【问题描述】:

我正在使用 akka-streams 来设置客户端 Web 套接字。我正在尝试将设置封装在具有以下签名的方法中:

def createConnectedWebSocket(url: String): Flow[Message, Message, _]

很清楚如何创建网络套接字流但尚未连接:

val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
    Http().webSocketClientFlow(WebSocketRequest(url))

我首先想Await升级响应future然后返回socket流。然而,为了获得未来,我必须实现流程,为此我必须连接SourceSink。但这应该是其他一些适配器类的责任,例如序列化和反序列化 json 对象并公开Flow[JsValue, JsValue, _] 的适配器类。它不应该担心连接丢失时可能会重新连接(一旦我设法编写它,这种行为将成为我方法的更复杂版本的一部分)。它应该只需要处理一个简单的Flow

我设法通过使用集线器实现了我想要的部分目标:

val mergeHubSource = MergeHub.source[Message](perProducerBufferSize = 16)
val broadcastHubSink = BroadcastHub.sink[Message](bufferSize = 16)

val ((messageSink, upgradeResponse), messageSource) =
  mergeHubSource
    .viaMat(webSocketFlow)(Keep.both)
    .toMat(broadcastHubSink)(Keep.both)
    .run()

所以现在我有一个Source 和一个Sink,我可以将它们组合成一个Flow 并返回它。问题是,我对集线器功能不感兴趣。当我将Source 连接到生成的Flow 并关闭它时,这应该传播到套接字,即套接字应该关闭。使用 MergeHub 时,它保持打开状态以便能够接受新的来源。

这可能吗?我想我可以通过自定义演员来弥补差距,但感觉就像我在这里重新发明了一些可能已经以另一种形式实现的东西。

【问题讨论】:

    标签: scala websocket akka-stream akka-http


    【解决方案1】:

    我找到了使用SourceRefSinkRef 的解决方案。虽然它们旨在用于弥合两台机器之间的差距,但它们也可以在这里使用。

    val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
        Http().webSocketClientFlow(WebSocketRequest(someUrl))
    
    val (sinkRefFuture, sourceRefFuture) =
      StreamRefs.sinkRef[In]()
        .viaMat(f)(Keep.left)
        .toMat(StreamRefs.sourceRef[Out]())(Keep.both)
        .run()
    
    val flow = Flow.fromSinkAndSource(await(sinkRefFuture), await(sourceRefFuture))
    

    await() 的定义如下:

    def await[T, F <: T](f: Future[F]): T = Await.result(f, 3.seconds)
    

    话虽如此,我认为至少在我的情况下,实际上最好不要提前实现套接字。这样,无论谁使用它,都可以重新连接。我现在正在传递一个流工厂,它根据需要创建 Web 套接字 Flow 的新实例(可能只有我实现一次)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-12-10
      • 1970-01-01
      • 1970-01-01
      • 2015-05-22
      • 1970-01-01
      • 2018-07-21
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多