【问题标题】:Akka Stream - Parallel runnable graphsAkka Stream - 并行可运行图
【发布时间】:2016-04-28 12:23:37
【问题描述】:

需要建议,我需要并行运行多个源图,例如,我创建了这个示例代码,我正在创建 10 个图并并行运行它们。

这是正确的方法还是我应该在一个图中创建多个源并在一个图中并行运行它们?

def createGraph(start: Int, end: Int, name: String) = {
  RunnableGraph.fromGraph(GraphDSL.create() {
    implicit builder =>
      import GraphDSL.Implicits._
      val s = Source(start to end)
      val f = Flow[Int].map[String](x => x.toString)
      val sink = Sink.foreach[String](x => println(name + ":" + x))

      val t = builder.add(s)

      val flow1 = builder.add(f)

      t ~> flow1 ~> sink

      ClosedShape
  })
}


(1 to 10).map(x => createGraph(x, x + 10, "g" + x)).map(_.run())

谢谢 阿伦

【问题讨论】:

  • 为什么所有代码​​都相当于:Source(start to end).map(_.toString).runForeach(x => println(s"$name:$x"))
  • 我从主代码中取出了一些复杂的流程处理来维护知识产权。问题陈述是,如果我有多个源信息并且需要为每个源运行图表,那么最佳实践是什么。例如,我们可以考虑多源,可以考虑读取多个 kafka 主题,使用数据库进行转换、处理和接收。
  • 这种方法对我来说有点奇怪,但我不能完全指出原因。我会分开时间把事情变成自己的阶段,然后使用合并和/或平衡将它们联系在一起,形成一个单一的图表。然后运行那个图表,只运行一次。现在您正在创建 n 个“岛屿”。

标签: akka akka-stream


【解决方案1】:

我尝试使用 http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-parallelism.html 进行并行处理,这看起来不错,其中我的源不同但流和接收器相同。在下面的示例中,每个源都是模拟,将它们视为您正在从某个外部源作为流读取:

object TestParallelGraph extends App {

  implicit val system = ActorSystem("test")
  implicit val dispacher = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val listOfDifferentSource=List(1,2,3) //consider we have to read data from various sources


 def createGraph() = {
    RunnableGraph.fromGraph(GraphDSL.create() {
      implicit builder =>
        import GraphDSL.Implicits._

        val merge=builder.add(Merge[Int](listOfDifferentSource.length))

        val flow=builder.add(Flow[Int].map(_ + 10)) //just random flow to add 10

        //again as mentioned above creating source with different information to simulate
        Source(listOfDifferentSource.head*100 to 100* listOfDifferentSource.head+10) ~> merge ~> flow ~> Sink.foreach(println)

        for{
          i <- listOfDifferentSource.tail //for each other source
        }yield (Source(100*i to 100*i+10) ~> merge) 

        ClosedShape
    })
  }

  createGraph().run()
}

【讨论】:

  • 请注意,这不是并行运行,而是按顺序运行,您需要使用.async 标记异步边界,如您链接到的文档中所述。
  • 谢谢约翰。我正在使用 kaka-stream-and-http-experimental 2.0.3 ,所以我认为 mapAsync 应该注意这一点。
  • 我的代码会变成 val flow=builder.add(Flow[Int].mapAsync(parallelism = 10)(_ + 10))
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-04-17
  • 1970-01-01
  • 1970-01-01
  • 2017-09-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多