【问题标题】:How do you process a sequence of Akka Stream Sources?你如何处理一系列 Akka 流源?
【发布时间】:2020-01-24 10:32:22
【问题描述】:

我们有一个可以处理事件的Sink

def parseEvent(): Sink[T, Future[akka.Done]] = {
  Sink.foreach[T] { event => {
    // Do stuff with the event
  }}
}

这适用于单个Source

val mySource: Source[T] = ...  
mySource.takeWhile( someCheck, true ).runWith(parseEvent)

如果你有,你如何让它工作:

val mySources: Seq[Source[T]] = ...

所有源都应该并行运行,所有事件都应该到达parseEvent

【问题讨论】:

    标签: scala akka akka-stream


    【解决方案1】:

    以下内容应该符合要求:

    import akka.NotUsed
    import akka.stream.scaladsl.{ Concat, Merge, Source }
    
    def sourceFromSources[T](sources: Seq[Source[T, NotUsed]]): Source[T, NotUsed] =
      sources.size match {
        case s if s < 1 => Source.empty[T]
        case 1 => sources.head
        case 2 => sources.head.merge(sources(1))
        case _ => Source.combine(sources.head, sources(1), sources.drop(2): _*)(Merge(_))
      }
    

    合并策略“合并多个流,从输入流中获取元素”,并在多个流有可用元素时随机选择。背压从下游传播到上游。

    【讨论】:

    猜你喜欢
    • 2018-08-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-07-06
    • 1970-01-01
    • 1970-01-01
    • 2019-06-25
    相关资源
    最近更新 更多