【问题标题】:Akka-streams backpressure on broadcast with async processingAkka-streams 通过异步处理对广播产生背压
【发布时间】:2019-05-26 23:17:17
【问题描述】:

当一个分支在图中花费大量时间(异步)进行广播时,我很难理解 akka-stream 是否会对 Source 施加背压。

我尝试了bufferbatch 来查看是否在源上施加了任何背压,但看起来不像。我也尝试过刷新System.out,但它并没有改变任何东西。

object Test extends App {
/* Necessary for akka stream */
implicit val system = ActorSystem("test")
implicit val materializer: ActorMaterializer = ActorMaterializer()

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val in = Source.tick(0 seconds, 1 seconds, 1)
        in.runForeach(i => println("Produced " + i))

    val out = Sink.foreach(println)
    val out2 = Sink.foreach[Int]{ o => println(s"2 $o") }

    val bcast = builder.add(Broadcast[Int](2))

    val batchedIn: Source[Int, Cancellable] = in.batch(4, identity) {
        case (s, v) => println(s"Batched ${s+v}"); s + v
    }

    val f2 = Flow[Int].map(_ + 10)
    val f4 = Flow[Int].map { i => Thread.sleep(2000); i}

    batchedIn ~> bcast ~> f2 ~> out
                 bcast ~> f4.async ~> out2
    ClosedShape
})

g.run()
}

我希望在运行程序时在控制台中看到“Batched ...”,并且由于 f4 速度不够快而无法处理这些值,因此有时会暂时卡住它。目前,这些都没有达到预期的效果,因为这些数字是连续生成的,并且没有完成任何批处理。

编辑:我注意到一段时间后,批处理消息开始在控制台中打印出来。我仍然不知道为什么它没有尽快发生,因为第一个元素应该发生背压

【问题讨论】:

    标签: scala akka-stream backpressure


    【解决方案1】:

    解释此行为的原因是 akka 在设置异步边界时引入的内部缓冲区。

    Buffers for asynchronous operators

    在使用异步运算符时作为优化引入的内部缓冲区。


    虽然流水线一般会增加吞吐量,但实际上,通过异步(因此线程交叉)边界传递元素的成本非常高。为了摊销这个成本,Akka Streams 在内部使用了一个窗口化的批处理背压策略。它是窗口化的,因为与 Stop-And-Wait 协议相反,多个元素可能与元素请求同时“进行中”。它也是批处理,因为一旦从窗口缓冲区中排出一个元素,就不会立即请求一个新元素,而是 在排出多个元素后请求多个元素。这种批处理策略降低了通过异步边界传播背压信号的通信成本。

    我知道这是一个玩具流,但如果你解释你的目标是什么,我会尽力帮助你。

    你需要mapAsync 而不是async

    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import akka.stream.scaladsl.GraphDSL.Implicits._
    
      val in = Source.tick(0 seconds, 1 seconds, 1).map(x => {println(s"Produced ${x}"); x})
    
      val out = Sink.foreach[Int]{ o => println(s"F2 processed $o") }
      val out2 = Sink.foreach[Int]{ o => println(s"F4 processed $o") }
    
      val bcast = builder.add(Broadcast[Int](2))
    
      val batchedIn: Source[Int, Cancellable] = in.buffer(4,OverflowStrategy.backpressure)
    
      val f2 = Flow[Int].map(_ + 10)
      val f4 = Flow[Int].mapAsync(1) { i => Future { println("F4 Started Processing"); Thread.sleep(2000); i }(system.dispatcher) }
    
      batchedIn ~> bcast ~> f2 ~> out
      bcast ~> f4 ~> out2
      ClosedShape
    }).run()
    

    【讨论】:

    • 您好,谢谢!确实,您似乎找到了问题所在。
    • 我尝试做的是阻止流向f2,而f4 正在使用此处的背压进行处理。注意async是为了避免阻塞Thread.sleep上的主线程,依靠背压有效应对处理速度差异。是不是更清楚了?
    • 使用batch(4,identity),您想说:“如果 f4 跟不上速度,我希望 f2 最多处理 4 个元素”?
    • 完全正确:我也试过buffer(4, OverflowStrategy.backpressure),但它也不是我想要的。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-09-08
    • 2021-08-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多