【问题标题】:Akka Streams filter & group by on a collection of keysAkka Streams 对一组键进行过滤和分组
【发布时间】:2016-10-06 06:10:41
【问题描述】:

我有一个流

case class Msg(keys: Seq[Char], value: String)

现在我想过滤一个键子集,例如 val filterKeys = Set[Char]('k','f','c')Filter(k.exists(filterKeys.contains))) 然后拆分这些,以便某些密钥由不同的流程处理,然后在最后合并在一起;

                                 /-key=k-> f1 --\
Source[Msg] ~> Filter ~> router |--key=f-> f2 ----> Merge --> f4
                                 \-key=c-> f3 --/

我该怎么做呢?

FlexiRoute 以旧的方式似乎是一个不错的选择,但在新的 API 中,我想我想要么制作一个自定义的 GraphStage,要么从 DSL 创建我自己的图表,因为我看不出有什么办法通过内置阶段做到这一点..?

【问题讨论】:

    标签: scala akka akka-stream reactive-streams


    【解决方案1】:

    小密钥集解决方案

    如果您的密钥集很小且不可变,那么广播和过滤器的组合可能是最容易理解的实现。您首先需要定义您描述的过滤器:

    def goodKeys(keySet : Set[Char]) = Flow[Msg] filter (_.keys exists keySet.contains)
    

    然后可以按照in the documentation 的描述向广播公司提供信息。所有具有良好键的Msg 值将被广播到三个过滤器中的每一个,每个过滤器将只允许特定的键:

    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._
    
      val source : Source[Msg] = ???
    
      val goodKeyFilter = goodKeys(Set('k','f','c'))
    
      val bcast = builder.add(BroadCast[Msg](3))
      val merge = builder.add(Merge[Msg](3))
    
      val kKey = goodKeys(Set('k'))
      val fKey = goodKeys(Set('f'))
      val cKey = goodKeys(Set('c'))
    
      //as described in the question
      val f1 : Flow[Msg, Msg, _] = ???
      val f2 : Flow[Msg, Msg, _] = ???
      val f3 : Flow[Msg, Msg, _] = ???
    
      val f4 : Sink[Msg,_] = ???
    
      source ~> goodKeyFilter ~> bcast ~> kKey ~> f1 ~> merge ~> f4
                                 bcast ~> fKey ~> f2 ~> merge
                                 bcast ~> cKey ~> f3 ~> merge
    

    大密钥集解决方案

    如果你的键集很大,那么 groupBy 更好。假设您有一个Map 的功能键:

    //e.g. 'k' -> f1
    val keyFuncs : Map[Set[Char], (Msg) => Msg]
    

    此地图可与 groupBy 功能一起使用:

    source
      .via(goodKeys(Set('k','f','c'))
      .groupBy(keyFuncs.size, _.keys)
      .map(keyFuncs(_.keys)) //apply one of f1,f2,f3 to the Msg
      .mergeSubstreams
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-06-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-08-07
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多