【问题标题】:Akka Stream DSL graph KillSwitchAkka Stream DSL 图 KillSwitch
【发布时间】:2017-09-19 12:04:07
【问题描述】:

我正在使用 Akka Streams 并使用 Alpakka 从文件中流式传输内容。一段时间后我需要停止流,所以我想使用KillSwitch。但是我不知道怎么用,因为我用的是graph DSL。

我的图表如下所示:

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._

  source ~> mainFlow ~> sink

  ClosedShape
})

graph.run()

我在这里找到了解决方案:How to abruptly stop an akka stream Runnable Graph?

但是,如果我使用图形 DSL,我不知道如何应用它。你能给我一些建议吗?

【问题讨论】:

    标签: scala stream akka-stream


    【解决方案1】:

    要在 GraphDSL 中显示物化值,您可以将物化到该值的阶段传递给 create 方法。用一个例子更容易解释。在你的情况下:

      val switch = KillSwitches.single[Int]
    
      val graph: RunnableGraph[UniqueKillSwitch] =
        RunnableGraph.fromGraph(GraphDSL.create(switch) { implicit builder: GraphDSL.Builder[UniqueKillSwitch] => sw =>
        import GraphDSL.Implicits._
    
        source ~> mainFlow ~> sw ~> sink
    
        ClosedShape
      })
    
      val ks = graph.run()
      ks.shutdown()
    

    【讨论】:

    • 好的,完美:) 我还有一个问题,你知道如何在处理 n 个元素后停止流吗?现在我正在使用 system.scheduler.scheduleOnce(10.seconds) 但最好在处理完定义的元素计数而不是时间后停止。
    • 不需要kill开关,你可以使用.take(n)
    • 我知道,但我想获取 n 个元素,停止流,调用另一个执行其他操作的方法,然后调用 system.terminate()。我想测量处理例如 100 个元素需要多长时间。
    • 仍然,不需要 KillSwitch。使用take(n),并选择一个在完成时实现为Future 的接收器(例如Sink.ignore)。然后,您可以将任何回调附加到您在运行图表时返回的未来。
    • 对不起,我还是不明白该怎么做。作为接收器,我正在使用一个文件,它看起来像这样: Sink[String, Future[IOResult]] ,如何附加回调?你能写简单的示例代码吗?谢谢
    猜你喜欢
    • 1970-01-01
    • 2016-09-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-03-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多