【问题标题】:Calculate the time stream takes to complete in akka stream with and without async计算在有和没有异步的情况下在 akka 流中完成的时间流
【发布时间】:2021-01-08 09:56:15
【问题描述】:

我想计算完成 akka 流所需的时间 object Demo 扩展 App {

  implicit val system       = ActorSystem("MyDemo")
  implicit val materializer = ActorMaterializer()
  val startTime = System.currentTimeMillis


  System.out.println(elapsedTime)
  val flowA = Flow[String].map { element ⇒
    println(s"Flow A : $element ${Thread.currentThread().getName()}" )
    Thread.sleep(1000)
     element
  }

  val flowB = Flow[String].map { element ⇒
    println(s"Flow B : $element ${Thread.currentThread().getName()}" )
    Thread.sleep(1000)
    element
  }

  val flowC = Flow[String].map { element ⇒
    println(s"Flow C : $element ${Thread.currentThread().getName()}" )
    Thread.sleep(1000)
    element
  }

  import system.dispatcher
  val completion = Source(List("Java", "Scala", "C++"))
    .via(flowA)
    .via(flowB)
    .via(flowC)
    .runWith(Sink.foreach(s ⇒ println("Got output " + s)))
  val stopTime = System.currentTimeMillis
  val elapsedTime = stopTime - startTime
  println(elapsedTime)
  completion.onComplete(_ => system.terminate())

输出

 0
113
Flow A : Java MyDemo-akka.actor.default-dispatcher-4
Flow B : Java MyDemo-akka.actor.default-dispatcher-4
Flow C : Java MyDemo-akka.actor.default-dispatcher-4
Got output Java
Flow A : Scala MyDemo-akka.actor.default-dispatcher-4
Flow B : Scala MyDemo-akka.actor.default-dispatcher-4
Flow C : Scala MyDemo-akka.actor.default-dispatcher-4
Got output Scala
Flow A : C++ MyDemo-akka.actor.default-dispatcher-4
Flow B : C++ MyDemo-akka.actor.default-dispatcher-4
Flow C : C++ MyDemo-akka.actor.default-dispatcher-4
Got output C++

查询

  1. 在流​​完成之前打印经过的时间113,原因不清楚。我想打印流完成处理后经过的时间
  2. 我们如何计算完成流处理所需的时间,因为我想比较使用.map 与用.async 替换.map 所用时间的结果

【问题讨论】:

  • 我不知道你问的是什么问题......
  • 我想按流计算完成的总时间,但我的输出打印流完成之前经过的时间@LeviRamsey
  • @LeviRamsey 编辑了我的问题以便更清楚

标签: asynchronous akka akka-stream


【解决方案1】:

运行流是异步的。对于

val completion =
   // omitted for brevity
   .runWith(Sink.foreach(s => println(s"Got output $s")))

completion 是一个Future[Done]Sink.foreach 的具体化值),当流成功完成时将使用Done(单例)完成(如果流失败,未来将失败)。实际上,该行代码已经完成,并且一旦流被具体化并开始执行,就会继续执行。

您只需将计算经过时间的代码移动到completion 上的onComplete 回调中即可获得所用时间的上限。

completion.onComplete { _ =>  // there's only one possible value here, so we don't need it
  val stopTime = System.currentTimeMillis()
  val elapsedTime = stopTime - startTime
  println(elapsedTime)
  system.terminate()
}

请注意,此回调将在流完成后的某个时间点执行,但不能保证它会立即执行。也就是说,只要您运行它的系统和 JVM 没有承受重负载,就足够了。

另外两点值得注意:

  • currentTimeMillis 真的不应该用于可靠的基准测试:它甚至不能保证是单调的(它可以倒退)。对于此目的,System.nanoTime 通常更可靠。
  • val completion = ??? 之前获取开始时间可能更现实,否则您也在测量构建流“蓝图”的时间,而不仅仅是具体化和运行流的时间。李>

【讨论】:

    【解决方案2】:

    我尝试建立一个图表来衡量流动时间,也许它可以帮助你。

    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.stream.FlowShape
    import akka.stream.scaladsl.{Flow, GraphDSL, Source, Unzip, Zip}
    
    object TimedFlow {
      def apply[In, Out](innerFlow: Flow[In, Out, NotUsed], func: (Long, Long) => Any): Flow[In, Out, NotUsed] = {
        val flowWithLong = Flow.fromGraph(GraphDSL.create() {
          implicit builder =>
            import akka.stream.scaladsl.GraphDSL.Implicits._
            val unzip = builder.add(Unzip[In, Long]())
            val zip = builder.add(Zip[Out, Long]())
    
            unzip.out0 ~> innerFlow ~> zip.in0
            unzip.out1 ~> zip.in1
            FlowShape(unzip.in, zip.out)
        })
    
        Flow[In]
          .map(in => (in, System.currentTimeMillis()))
          .via(flowWithLong)
          .via(Flow[(Out, Long)].map {
            case (out, beginTime) =>
              val endTime = System.currentTimeMillis()
              func(beginTime, endTime)
              out
          })
      }
    
      def main(args: Array[String]): Unit = {
        implicit val system: ActorSystem = ActorSystem("QuickStart")
        val source: Source[Int, NotUsed] = Source(1 to 100)
    
        implicit val ec = system.dispatcher
    
        val plusOneFlowWithTimePrint = TimedFlow(plusOneFlow(), (beginTime: Long, endTime: Long) => {
          println(s"begin ${beginTime} end ${endTime}")
          println(s"end - begin: ${endTime - beginTime}")
        })
        val done = source.via(plusOneFlowWithTimePrint).runForeach(println)
    
        done.onComplete(_ => system.terminate())
      }
    
      def plusOneFlow(): Flow[Int, Int, NotUsed] = {
        Flow[Int]
          .map {
            x =>
              Thread.sleep(50)
              x + 1
          }
      }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-03-24
      • 1970-01-01
      • 1970-01-01
      • 2018-12-05
      • 2019-12-16
      相关资源
      最近更新 更多