【问题标题】:Akka Streams stop stream after process n elementsAkka Streams 在处理 n 个元素后停止流
【发布时间】:2017-04-26 02:08:59
【问题描述】:

我有 Akka Stream 流,它正在使用 alpakka 从文件中读取、处理数据并写入文件。我想在处理 n 个元素后停止流程,计算持续时间并调用系统终止。我怎样才能实现它?

我的流程是这样的:

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
        import GraphDSL.Implicits._

 sourceFile ~> mainFlow ~> sinkFile

ClosedShape
})

graph.run()

你有什么想法吗?谢谢

【问题讨论】:

    标签: scala akka-stream


    【解决方案1】:

    同意@Viktor的说法,首先你不需要使用graphDSL来实现,你可以使用take(n)来完成图。

    其次,您可以使用 mapMaterializedValue 将回调附加到您的 Sink 物化值(这反过来应该物化为 Future[Something])。

      val graph: RunnableGraph[Future[FiniteDuration]] =
        sourceFile
          .via(mainFlow)
          .take(N)
          .toMat(sinkFile)(Keep.right)
          .mapMaterializedValue { f ⇒
            val start = System.nanoTime()
            f.map(_ ⇒ FiniteDuration(System.nanoTime() - start, TimeUnit.NANOSECONDS))
          }
    
      graph.run().onComplete { duration ⇒
        println(s"Elapsed time: $duration")
      }
    

    请注意,您将需要一个ExecutionContext 在范围内。

    编辑

    即使您必须使用 graphDSL,同样的概念也适用。您只需要公开您的接收器的具体化Future 并在其上映射。

      val graph: RunnableGraph[Future[??Something??]] = 
        RunnableGraph.fromGraph(GraphDSL.create(sinkFile) { implicit builder: GraphDSL.Builder[Future[Something]] => snk =>
        import GraphDSL.Implicits._
    
        sourceFile ~> mainFlow ~> snk
    
        ClosedShape
      })
    
      val timedGraph: RunnableGraph[Future[FiniteDuration]] = 
        graph.mapMaterializedValue { f ⇒
          val start = System.nanoTime()
          f.map(_ ⇒ FiniteDuration(System.nanoTime() - start, TimeUnit.NANOSECONDS))
        }
    
      timedGraph.run().onComplete { duration ⇒
        println(s"Elapsed time: $duration")
      }
    

    【讨论】:

    • 抱歉,我没有具体说明。我的流程更复杂,不仅是:sourceFile ~> mainFlow ~> sinkFile。我写它就像一个抽象。我的流程看起来像这样:sourceFile ~> bcastA ~> FlowA ~> merge ~> FlowC ~> bcastA ~> SinkA bcastA ~> FlowB ~> merge ~> bcastA ~> SinkB 所以我必须使用 GraphDSL。再次抱歉造成混乱。
    • 抱歉格式化,我不知道格式化是否正确。重要的是我有多个流与广播、合并和多个接收器。
    【解决方案2】:

    这里不需要 GraphDSL。

    val doneFuture = (sourceFile via mainFlow.take(N) runWith sinkFile) transformWith { _ => system.terminate() }
    

    获取时间可以使用akka-streams-contrib:https://github.com/akka/akka-stream-contrib/blob/master/contrib/src/main/scala/akka/stream/contrib/Timed.scala

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-11-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-04-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多