【问题标题】:How can I merge an arbitrary number of sources in Akka stream?如何在 Akka 流中合并任意数量的源?
【发布时间】:2018-06-12 20:43:47
【问题描述】:

我有 n 想在 Akka 流中按优先级合并的源。我的实现基于GraphMergePrioritiziedSpec,其中合并了三个优先源。我试图用以下方法抽象出Sources 的数量:

import akka.NotUsed
import akka.stream.{ClosedShape, Graph, Materializer}
import akka.stream.scaladsl.{GraphDSL, MergePrioritized, RunnableGraph, Sink, Source}
import org.apache.activemq.ActiveMQConnectionFactory

class SourceMerger(
  sources: Seq[Source[java.io.Serializable, NotUsed]],
  priorities: Seq[Int],
  private val sink: Sink[java.io.Serializable, _]
) {

  require(sources.size == priorities.size, "Each source should have a priority")

  import GraphDSL.Implicits._

  private def partial(
    sources: Seq[Source[java.io.Serializable, NotUsed]],
    priorities: Seq[Int],
    sink: Sink[java.io.Serializable, _]
  ): Graph[ClosedShape, NotUsed] = GraphDSL.create() { implicit b =>

      val merge = b.add(MergePrioritized[java.io.Serializable](priorities))

      sources.zipWithIndex.foreach { case (s, i) =>
        s.shape.out ~> merge.in(i)
      }

      merge.out ~> sink
      ClosedShape
  }

  def merge(
    sources: Seq[Source[java.io.Serializable, NotUsed]],
    priorities: Seq[Int],
    sink: Sink[java.io.Serializable, _]
  ): RunnableGraph[NotUsed] = RunnableGraph.fromGraph(partial(sources, priorities, sink))

  def run()(implicit mat: Materializer): NotUsed = merge(sources, priorities, sink).run()(mat)
}

但是,运行以下存根时出现错误:

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{Sink, Source}
import org.scalatest.{Matchers, WordSpecLike}
import akka.testkit.TestKit

import scala.collection.immutable.Iterable

class SourceMergerSpec extends TestKit(ActorSystem("SourceMerger")) with WordSpecLike with Matchers {

  implicit val materializer: Materializer = ActorMaterializer()

  "A SourceMerger" should {
    "merge by priority" in {

      val priorities: Seq[Int] = Seq(1,2,3)

      val highPriority = Iterable("message1", "message2", "message3")
      val mediumPriority = Iterable("message4", "message5", "message6")
      val lowPriority = Iterable("message7", "message8", "message9")

      val source1 = Source[String](highPriority)
      val source2 = Source[String](mediumPriority)
      val source3 = Source[String](lowPriority)

      val sources = Seq(source1, source2, source3)

      val subscriber = Sink.seq[java.io.Serializable]

      val merger = new SourceMerger(sources, priorities, subscriber)

      merger.run()

      source1.runWith(Sink.foreach(println))
    }
  }

}

相关的stacktrace在这里:

[StatefulMapConcat.out] is already connected
java.lang.IllegalArgumentException: [StatefulMapConcat.out] is already connected
    at akka.stream.scaladsl.GraphDSL$Builder.addEdge(Graph.scala:1304)
    at akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase$class.$tilde$greater(Graph.scala:1431)
    at akka.stream.scaladsl.GraphDSL$Implicits$PortOpsImpl.$tilde$greater(Graph.scala:1521)
    at SourceMerger$$anonfun$partial$1$$anonfun$apply$1.apply(SourceMerger.scala:26)
    at SourceMerger$$anonfun$partial$1$$anonfun$apply$1.apply(SourceMerger.scala:25)

似乎错误来自这个:

sources.zipWithIndex.foreach { case (s, i) =>
  s.shape.out ~> merge.in(i)
}

是否可以在 Akka 流 Graph DSL 中合并任意数量的 Sources?如果是这样,为什么我的尝试没有成功?

【问题讨论】:

    标签: scala akka akka-stream


    【解决方案1】:

    代码示例的主要问题

    问题中提供的代码 sn-p 的一个大问题是 source1 连接到来自 merge 调用和 Sink.foreach(println)Sink。如果没有intermediate fan-out element,同一个Source 不能连接到多个Sink。

    删除Sink.foreach(println) 可能会彻底解决您的问题。

    简化设计

    基于来自特定Source 的所有消息具有相同优先级这一事实,可以简化合并。这意味着您可以按各自的优先级对源进行排序,然后将它们连接在一起:

    private def partial(sources: Seq[Source[java.io.Serializable, NotUsed]],
                        priorities: Seq[Int],
                        sink: Sink[java.io.Serializable, _]): RunnableGraph[NotUsed] = 
       sources.zip(priorities)
              .sortWith(_._2 < _._2)
              .map(_._1)
              .reduceOption(_ ++ _)
              .getOrElse(Source.empty[java.io.Serializable])
              .to(sink)
    

    【讨论】:

    • 这看起来很不错。我特别喜欢避免使用MergePrioritized。使用您的构造,如果我有一个具有所有低优先级消息的当前视图的组合源,那么当它进入时,一个高优先级消息会被推到前面,这是真的吗?我正在尝试为此制定一个测试,但我对 Akka 的流 API 很陌生。
    • @erip 因为我的实现使用了串联,所以在从较低优先级的源中提取任何元素之前,必须先耗尽优先级较高的源,流结束。因此,您描述的场景是不可能的……
    • 嗯。有可能吗?
    • @erip 否。在您的示例代码中,source1 必须在处理来自source2 的任何元素之前完全耗尽。在source1 发送流结束信号之前,source2 将处于休眠状态。
    • Akka 流不支持合并无限流;即,这个来源是空的,但没有终止——从下一个来源消费?嗯。
    【解决方案2】:

    如果我替换你的代码运行没有错误

      sources.zipWithIndex.foreach { case (s, i) =>
        s.shape.out ~> merge.in(i)
      }
    

      sources.zipWithIndex.foreach { case (s, i) =>
        s ~> merge.in(i)
      }
    

    我承认我不太清楚为什么!无论如何,s.shapeStatefulMapConcat,这就是它抱怨输出端口已经连接的地方。即使您只传递一个来源,也会出现问题,因此任意数字不是问题。

    【讨论】:

    • 我不知道我是怎么错过的!
    猜你喜欢
    • 2012-03-13
    • 2019-11-16
    • 2015-07-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-06-25
    • 1970-01-01
    • 2015-01-31
    相关资源
    最近更新 更多