【发布时间】:2015-11-16 06:34:02
【问题描述】:
在阅读了 AKKA 团队 Shutdown Patterns in AKKA 2 撰写的这篇优秀博客后,我运行了代码,它确实有效。
但是当我做另一个稍微改变的实验时,在工人中抛出一个异常,那么这种模式就行不通了。既然工人在工作期间可能会抛出任何类型的异常,这是合理的,对吧?
以下是我的代码,两个文件:
Reaper.scala,抄自上述文章:
import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer
object Reaper {
// Used by others to register an Actor for watching
case class WatchMe(ref: ActorRef)
}
abstract class Reaper extends Actor {
import Reaper._
// Keep track of what we're watching
val watched = ArrayBuffer.empty[ActorRef]
// Derivations need to implement this method. It's the
// hook that's called when everything's dead
def allSoulsReaped(): Unit
// Watch and check for termination
final def receive = {
case WatchMe(ref) =>
context.watch(ref)
watched += ref
case Terminated(ref) =>
watched -= ref
if (watched.isEmpty) allSoulsReaped()
}
}
TestWorker.scala
import akka.actor.{SupervisorStrategy, Props, ActorSystem, Actor}
import Reaper._
class TestReaper extends Reaper {
def allSoulsReaped(): Unit = context.system.shutdown()
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
}
// The reaper sends this message to all workers to notify them to start work
case object StartWork
class TestWorker extends Actor {
def receive = {
case StartWork =>
// do real work ...
throw new IllegalStateException("Simulate uncaught exceptions during work")
}
}
object TestWorker {
def main(args: Array[String]) : Unit = {
val system = ActorSystem("system")
val reaper = system.actorOf(Props[TestReaper])
val worker1 = system.actorOf(Props[TestWorker])
val worker2 = system.actorOf(Props[TestWorker])
reaper ! WatchMe(worker1)
reaper ! WatchMe(worker2)
Thread.sleep(3000) // make sure WatchMe will be delivered before StartWork
worker1 ! StartWork
worker2 ! StartWork
system.awaitTermination()
}
}
这个程序将永远挂起。
如果工人抛出未捕获的异常,收割者似乎无法接收Terminated 消息
谁能告诉我为什么?非常感谢!
正确答案@mattinbits:
这个程序永远挂起的原因是在我的代码中TestWorker 不是TestReaper 的孩子,甚至TestReaper 调用context.watch(ref)。
context.watch() 并不意味着成为一个孩子。 context.watch(ref) 只是意味着TestReaper 将在TestWorker 演员死亡时得到通知。
SupervisorStrategy 和 context.watch() 是两个不同的东西。 SupervisorStrategy 只会影响所有儿童演员。
将override val supervisorStrategy = SupervisorStrategy.stoppingStrategy 放入TestReaper 不会使TestWorker 在TestWorker 内部发生异常时停止。相反,我们需要更改TestWorkers 父actor的SupervisorStrategy。由于上面代码中的所有actor都是system.actorOf()创建的,它们都是Guardian Actor /user的孩子,所以实际上我们需要改变/useractor的监督策略,在akka { actor { guardian-supervisor-strategy = "akka.actor.StoppingSupervisorStrategy" } }中添加akka { actor { guardian-supervisor-strategy = "akka.actor.StoppingSupervisorStrategy" } }
但是,最好使用另一个 Actor 作为监督 Actor,就像 @mattinbits 在他的代码中所做的那样。
【问题讨论】:
-
您确认
Reaper收到WatchMe消息的时间早于工人收到StartWork消息的时间吗?由于!是异步的,所以StartWork有可能出现在WatchMe之前 -
我在
reaper ! WatchMe(worker2)和worker1 ! SartWork之间添加了Thread.sleep(3000),仍然挂起
标签: akka