【发布时间】:2019-05-26 23:17:17
【问题描述】:
当一个分支在图中花费大量时间(异步)进行广播时,我很难理解 akka-stream 是否会对 Source 施加背压。
我尝试了buffer 和batch 来查看是否在源上施加了任何背压,但看起来不像。我也尝试过刷新System.out,但它并没有改变任何东西。
object Test extends App {
/* Necessary for akka stream */
implicit val system = ActorSystem("test")
implicit val materializer: ActorMaterializer = ActorMaterializer()
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source.tick(0 seconds, 1 seconds, 1)
in.runForeach(i => println("Produced " + i))
val out = Sink.foreach(println)
val out2 = Sink.foreach[Int]{ o => println(s"2 $o") }
val bcast = builder.add(Broadcast[Int](2))
val batchedIn: Source[Int, Cancellable] = in.batch(4, identity) {
case (s, v) => println(s"Batched ${s+v}"); s + v
}
val f2 = Flow[Int].map(_ + 10)
val f4 = Flow[Int].map { i => Thread.sleep(2000); i}
batchedIn ~> bcast ~> f2 ~> out
bcast ~> f4.async ~> out2
ClosedShape
})
g.run()
}
我希望在运行程序时在控制台中看到“Batched ...”,并且由于 f4 速度不够快而无法处理这些值,因此有时会暂时卡住它。目前,这些都没有达到预期的效果,因为这些数字是连续生成的,并且没有完成任何批处理。
编辑:我注意到一段时间后,批处理消息开始在控制台中打印出来。我仍然不知道为什么它没有尽快发生,因为第一个元素应该发生背压
【问题讨论】:
标签: scala akka-stream backpressure