【问题标题】:How to broadcast the received messages to two different flows如何将收到的消息广播到两个不同的流
【发布时间】:2019-04-15 08:03:09
【问题描述】:

如何将收到的消息广播到两个不同的流

我正在使用 akka 流 websocket 客户端来请求和接收数据 websocket 服务器。 使用从 websocket 接收到的数据,我想广播到两个不同的流中。 下图,应该澄清场景:

正如您在图像上看到的那样,它应该随后被广播到两个不同的流以单独的接收器。

websocket 客户端可以如下创建:

import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._

import scala.concurrent.Future

object WebSocketClientFlow {
  def main(args: Array[String]) = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    import system.dispatcher

    // Future[Done] is the materialized value of Sink.foreach,
    // emitted when the stream completes
    val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
      }

    // send this as a message over the WebSocket
    val outgoing = Source.single(TextMessage("hello world!"))

    // flow to use (note: not re-usable!)
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))

    // the materialized value is a tuple with
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that
    // completes or fails when the connection succeeds or fails
    // and closed is a Future[Done] with the stream completion from the incoming sink
    val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    // just like a regular http request we can access response status which is available via upgrade.response.status
    // status code 101 (Switching Protocols) indicates that server support WebSockets
    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    // in a real application you would not side effect here
    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }
}

【问题讨论】:

    标签: scala akka akka-stream akka-http


    【解决方案1】:

    您可以使用 SinkShape 获得所需的流量

    Sink.fromGraph(GraphDSL.create(){
      implicit b =>
        val bcast = b.add(Broadcast[Message](2))
        val flow1 = b.add(Flow[Message].map(m => m))
        val flow2 = b.add(Flow[Message].map(m => m ))
        val sink1 = b.add(Sink.foreach(println))
        val sink2 = b.add(Sink.foreach(println))
    
        bcast ~> flow1 ~> sink1
        bcast ~> flow2 ~> sink2
    
        SinkShape(bcast.in)
    })
    

    整个代码是

      implicit val system = ActorSystem()
      implicit val materializer = ActorMaterializer()
      import system.dispatcher
    
      // Future[Done] is the materialized value of Sink.foreach,
      // emitted when the stream completes
    
      val incomingSink = Sink.fromGraph(GraphDSL.create() {
        implicit b =>
          import GraphDSL.Implicits._
          val bcast = b.add(Broadcast[Message](2))
          val flow1 = b.add(Flow[Message].map(m => m))
          val flow2 = b.add(Flow[Message].map(m => m ))
          val sink1 = b.add(Sink.head[Message])
          val sink2 = b.add(Sink.head[Message])
    
          bcast ~> flow1 ~> sink1
          bcast ~> flow2 ~> sink2
    
          SinkShape(bcast.in)
      }).mapMaterializedValue(_ => Future(Done))
      // send this as a message over the WebSocket
      val outgoing = Source.single(TextMessage("hello world!"))
    
      // flow to use (note: not re-usable!)
      val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))
    
      // the materialized value is a tuple with
      // upgradeResponse is a Future[WebSocketUpgradeResponse] that
      // completes or fails when the connection succeeds or fails
      // and closed is a Future[Done] with the stream completion from the incoming sink
      val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incomingSink)(Keep.both) // also keep the Future[Done]
        .run()
    
      // just like a regular http request we can access response status which is available via upgrade.response.status
      // status code 101 (Switching Protocols) indicates that server support WebSockets
      val connected = upgradeResponse.flatMap { upgrade =>
        if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
          Future.successful(Done)
        } else {
          throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
        }
      }
    
      // in a real application you would not side effect here
      connected.onComplete(println)
      closed.foreach(_ => println("closed"))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-03-04
      相关资源
      最近更新 更多