【发布时间】:2019-05-14 04:40:49
【问题描述】:
我在这里有一个简单的 akka 流,但无法弄清楚为什么在 killSwitches 中止处理后流永远不会正确重新启动。任何指针将不胜感激
object TestMain extends App {
implicit val actorSystem = ActorSystem("TestMain")
implicit val materializer = ActorMaterializer()
val sharedKillSwitch = KillSwitches.shared("fp-change-kill-switch")
// This stream keeps restarting after aborting and stops after 10 times
RestartSource.withBackoff(1 second, 1 second, 0.2, 10) {
() => Source.tick(1 second, 1 second, 200).via(sharedKillSwitch.flow)
}.to(Sink.foreach(println((_)))).run()
// This does not restart at all
Source.tick(1 second, 1 second, 400)
.via(sharedKillSwitch.flow)
.to(RestartSink.withBackoff[Int](1 second, 1 second, 0.2, 10) {
() => Sink.foreach(println(_))
}).run()
Thread.sleep(5000)
sharedKillSwitch.abort(new Exception(""))
}
我需要的是不重新启动整个流(包括源),而只重新启动接收器 - 就像案例 2 中只有 RestartSink 一样
【问题讨论】:
标签: scala akka actor akka-stream