【问题标题】:Akka RestartSink doesn't restart streamAkka RestartSink 不会重新启动流
【发布时间】:2019-05-14 04:40:49
【问题描述】:

我在这里有一个简单的 akka 流,但无法弄清楚为什么在 killSwitches 中止处理后流永远不会正确重新启动。任何指针将不胜感激

object TestMain extends App {
    implicit val actorSystem = ActorSystem("TestMain")
    implicit val materializer = ActorMaterializer()
    val sharedKillSwitch = KillSwitches.shared("fp-change-kill-switch")

    // This stream keeps restarting after aborting and stops after 10 times 
    RestartSource.withBackoff(1 second, 1 second, 0.2, 10) {
        () => Source.tick(1 second, 1 second, 200).via(sharedKillSwitch.flow)
    }.to(Sink.foreach(println((_)))).run()

   // This does not restart at all
   Source.tick(1 second, 1 second, 400)
    .via(sharedKillSwitch.flow)
    .to(RestartSink.withBackoff[Int](1 second, 1 second, 0.2, 10) {
      () => Sink.foreach(println(_))
    }).run()

  Thread.sleep(5000)
  sharedKillSwitch.abort(new Exception(""))

}

我需要的是不重新启动整个流(包括源),而只重新启动接收器 - 就像案例 2 中只有 RestartSink 一样

【问题讨论】:

    标签: scala akka actor akka-stream


    【解决方案1】:

    RestartSink 和 RestartSource 是有区别的。

    当流或与其连接的接收器发出错误或取消时,RestartSource 会重新启动。

    RestartSink 只检查它管理的接收器的取消。我认为您应该能够将 KillSwitch 流集成到假设在重新启动时重新创建的接收器中。

    看看这个documentation

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-06-12
      • 1970-01-01
      • 1970-01-01
      • 2018-01-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多