【问题标题】:Akka Streams split stream by typeAkka Streams 按类型拆分流
【发布时间】:2016-11-05 00:23:28
【问题描述】:

我有以下简单的案例类层次结构:

sealed trait Message
case class Foo(bar: Int) extends Message
case class Baz(qux: String) extends Message

我有一个Flow[Message, Message, NotUsed](来自基于 Websocket 的协议,编解码器已经到位)。

我想将此Flow[Message] 解复用为 Foo 和 Baz 类型的单独流,因为它们由完全不同的路径处理。

最简单的方法是什么?应该很明显,但我错过了一些东西......

【问题讨论】:

    标签: scala akka akka-stream reactive-streams


    【解决方案1】:

    一种方法是使用创建一个包含每种消息类型的流的 RunnableGraph。

    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    
      val in = Source(...)  // Some message source
      val out = Sink.ignore
    
      val foo = builder.add(Flow[Message].map (x => x match { case f@Foo(_) => f }))
      val baz = builder.add(Flow[Message].map (x => x match { case b@Baz(_) => b }))
      val partition = builder.add(Partition[Message](2, {
        case Foo(_) => 0
        case Baz(_) => 1
      }))
    
      partition ~> foo ~> // other Flow[Foo] here ~> out
      partition ~> baz ~> // other Flow[Baz] here ~> out
    
      ClosedShape
    }
    
    g.run()
    

    【讨论】:

    猜你喜欢
    • 2015-07-31
    • 2014-10-27
    • 1970-01-01
    • 2021-09-08
    • 2016-10-20
    • 2020-02-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多