【问题标题】:Akka Streams Error Handling. How to know which row failed?Akka 流错误处理。如何知道哪一行失败?
【发布时间】:2017-09-21 02:23:57
【问题描述】:

我读过这篇关于 akka 流错误处理的文章

http://doc.akka.io/docs/akka/2.5.4/scala/stream/stream-error.html

并编写了这段代码。

val decider: Supervision.Decider = {
  case _: Exception => Supervision.Restart
  case _ => Supervision.Stop
}

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))

val source = Source(1 to 10)
 val flow = Flow[Int].map{x => if (x != 9) 2 * x else throw new Exception("9!")}
 val sink : Sink[Int, Future[Done]] = Sink.foreach[Int](x => println(x))
 val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit builder => s =>
  import GraphDSL.Implicits._
  source ~> flow ~> s.in
  ClosedShape
})
val future = graph.run()
future.onComplete{ _ =>
  actorSystem.terminate()
}
Await.result(actorSystem.whenTerminated, Duration.Inf)

这很好用....除了我需要扫描输出以查看未处理的行。有没有办法让我打印/记录失败的行? [没有在我编写的每个流程中放置明确的 try/catch 块?]

因此,例如,如果我使用演员(而不是流),我可以编写演员的生命周期事件,并且我可以记录演员何时重新启动以及重新启动时正在处理的消息.

但在这里我没有明确使用演员(尽管他们在内部使用)。 Flow / Source / Sink 是否有生命周期事件?

【问题讨论】:

  • 你能分叉流并将坏的推到一个不同的“坏”队列,在那里他们刚刚被注销(或者你想对他们做什么)?
  • 所以 .. 为了分叉流,您仍然必须捕获所有异常,并且在 catch 块中路由到不同的流......这不是很乏味吗?是否有一个简单的方法是用失败的消息分叉流?
  • 哦,我没看懂你的例子,我以为你的条件语句表明你知道要提前注意的情况。

标签: scala akka-stream


【解决方案1】:

只需对您的代码稍作修改:

  val decider: Supervision.Decider = {
  case e: Exception =>
    println("Exception handled, recovering stream:" + e.getMessage)
    Supervision.Restart
  case _ => Supervision.Stop
}

如果您将有意义的消息传递给流中的异常,例如该行,您可以在监督决策器中打印它们。

我使用println 给出了一个快速而简短的答案,但强烈建议使用 一些日志库,例如scala-logging

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-06-18
    • 1970-01-01
    • 2021-06-10
    • 1970-01-01
    • 1970-01-01
    • 2017-01-06
    • 1970-01-01
    相关资源
    最近更新 更多