【问题标题】:AKKA Shutdown Pattern doesn't workAKKA 关机模式不起作用
【发布时间】:2015-11-16 06:34:02
【问题描述】:

在阅读了 AKKA 团队 Shutdown Patterns in AKKA 2 撰写的这篇优秀博客后,我运行了代码,它确实有效。

但是当我做另一个稍微改变的实验时,在工人中抛出一个异常,那么这种模式就行不通了。既然工人在工作期间可能会抛出任何类型的异常,这是合理的,对吧?

以下是我的代码,两个文件:

Reaper.scala,抄自上述文章:

import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer

object Reaper {
  // Used by others to register an Actor for watching
  case class WatchMe(ref: ActorRef)
}

abstract class Reaper extends Actor {
  import Reaper._

  // Keep track of what we're watching
  val watched = ArrayBuffer.empty[ActorRef]

  // Derivations need to implement this method.  It's the
  // hook that's called when everything's dead
  def allSoulsReaped(): Unit

  // Watch and check for termination
  final def receive = {
    case WatchMe(ref) =>
      context.watch(ref)
      watched += ref
    case Terminated(ref) =>
      watched -= ref
      if (watched.isEmpty) allSoulsReaped()
  }
}

TestWorker.scala

import akka.actor.{SupervisorStrategy, Props, ActorSystem, Actor}
import Reaper._


class TestReaper extends Reaper {
  def allSoulsReaped(): Unit = context.system.shutdown()
  override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
}

// The reaper sends this message to all workers to notify them to start work
case object StartWork

class TestWorker extends  Actor {
  def receive = {
    case StartWork =>
      // do real work ...
      throw new IllegalStateException("Simulate uncaught exceptions during work")
  }
}

object TestWorker {
  def main(args: Array[String]) : Unit = {
    val system = ActorSystem("system")
    val reaper = system.actorOf(Props[TestReaper])
    val worker1 = system.actorOf(Props[TestWorker])
    val worker2 = system.actorOf(Props[TestWorker])

    reaper ! WatchMe(worker1)
    reaper ! WatchMe(worker2)
    Thread.sleep(3000) // make sure WatchMe will be delivered before StartWork
    worker1 ! StartWork
    worker2 ! StartWork
    system.awaitTermination()
  }
}

这个程序将永远挂起。

如果工人抛出未捕获的异常,收割者似乎无法接收Terminated 消息

谁能告诉我为什么?非常感谢!

正确答案@mattinbits:

这个程序永远挂起的原因是在我的代码中TestWorker 不是TestReaper 的孩子,甚至TestReaper 调用context.watch(ref)

context.watch() 并不意味着成为一个孩子。 context.watch(ref) 只是意味着TestReaper 将在TestWorker 演员死亡时得到通知。

SupervisorStrategycontext.watch() 是两个不同的东西。 SupervisorStrategy 只会影响所有儿童演员。

override val supervisorStrategy = SupervisorStrategy.stoppingStrategy 放入TestReaper 不会使TestWorkerTestWorker 内部发生异常时停止。相反,我们需要更改TestWorkers 父actor的SupervisorStrategy。由于上面代码中的所有actor都是system.actorOf()创建的,它们都是Guardian Actor /user的孩子,所以实际上我们需要改变/useractor的监督策略,在akka { actor { guardian-supervisor-strategy = "akka.actor.StoppingSupervisorStrategy" } }中添加akka { actor { guardian-supervisor-strategy = "akka.actor.StoppingSupervisorStrategy" } }

但是,最好使用另一个 Actor 作为监督 Actor,就像 @mattinbits 在他的代码中所做的那样。

【问题讨论】:

  • 您确认Reaper 收到WatchMe 消息的时间早于工人收到StartWork 消息的时间吗?由于! 是异步的,所以StartWork 有可能出现在WatchMe 之前
  • 我在reaper ! WatchMe(worker2)worker1 ! SartWork 之间添加了Thread.sleep(3000),仍然挂起

标签: akka


【解决方案1】:

光看演员是不够的,你还必须确保演员停下来(这是发送Terminated的条件)。

默认情况下,当actor抛出异常时,策略是重新启动它。您需要给演员一个主管,它将应用Stop 指令。

看看下面的内容,两个测试都通过了(收割者与上面的版本没有变化):

import java.util.concurrent.TimeoutException
import Reaper.WatchMe
import akka.actor.SupervisorStrategy.Stop
import akka.actor._
import akka.testkit.{TestProbe, TestKit}
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike, WordSpec}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

case object StartWork

class TestReaper extends Reaper {
  def allSoulsReaped(): Unit = context.system.shutdown()
}

class TestWorker extends  Actor {
  def receive = {
    case StartWork =>
      // do real work ...
      throw new IllegalStateException("Simulate uncaught exceptions during work")
  }
}

class TestParent(reaper: ActorRef, probe: ActorRef) extends Actor {

  def receive = {
    case "Start" =>
      val worker1 = context.actorOf(Props[TestWorker])
      val worker2 = context.actorOf(Props[TestWorker])
      reaper ! WatchMe(worker1)
      reaper ! WatchMe(worker2)
      worker1 ! StartWork
      worker2 ! StartWork
  }

  override def supervisorStrategy = OneForOneStrategy() {
    case ex: IllegalStateException =>
      probe ! "Stopped a worker"
      Stop
  }
}


class TestSupervision extends TestKit(ActorSystem("Test"))
with WordSpecLike
with Matchers
with BeforeAndAfterAll{

  "Supervision" should {

    "Stop the actor system when the parent stops the workers" in {
      val reaper = system.actorOf(Props[TestReaper])
      val probe = TestProbe()
      val parent = system.actorOf(Props(new TestParent(reaper, probe.ref)))
      parent ! "Start"
      probe.expectMsg("Stopped a worker")
      probe.expectMsg("Stopped a worker")
      import system.dispatcher
      val terminatedF = Future {
        system.awaitTermination()
      }
      Await.ready(terminatedF, 2 seconds)
    }
  }

  override def afterAll(){
    system.shutdown()
  }
}

class TestLackSupervision extends TestKit(ActorSystem("Test2"))
with WordSpecLike
with Matchers
with BeforeAndAfterAll{

  "Lack of Supervision" should {
    "Not stop the actor system when the workers don't have an appropriate parent" in {
      val reaper = system.actorOf(Props[TestReaper])
      val worker1 = system.actorOf(Props[TestWorker])
      val worker2 = system.actorOf(Props[TestWorker])
      reaper ! WatchMe(worker1)
      reaper ! WatchMe(worker2)
      import system.dispatcher
      val terminatedF = Future { system.awaitTermination()}
      a [TimeoutException] should be thrownBy Await.ready(terminatedF, 2 seconds)
    }
  }

  override def afterAll(){
    system.shutdown()
  }
}

默认情况下,当演员抛出异常时,他们会重新启动。由于监督策略是从父级应用到子级的,因此存在 TestParent 以对子级强制执行 Stop 指令。由于这个原因,您的原始代码将不起作用。

如果您希望顶级演员(使用system.actorOf 启动的演员)在异常时停止,您可以设置配置属性akka.actor.guardian-supervisor-strategy = "akka.actor.StoppingSupervisorStrategy" 但在我的示例中,我更喜欢使用父演员,因为演员层次结构是一种正常的方式在 Akka 组织监督。

要作为应用程序运行,请执行以下类似操作:

object Main extends App {
  val system = ActorSystem("Example")
  val reaper = system.actorOf(Props[TestReaper])
  val dummyProbe = system.actorOf(Props(new Actor{

    def receive = {
      case "Stopped a worker" => println("Stopped a worker")
    }

  }))
  val parent = system.actorOf(Props(new TestParent(reaper, dummyProbe)))
  parent ! "Start"
  system.awaitTermination()
}

要阻止异常在命令行上打印并混淆输出,请更改监督策略如下:

override def supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {...}

【讨论】:

  • 好点,但实际上默认情况下preRestart()会停止actor,但是,即使我在父actor中添加一行override val supervisorStrategy = SupervisorStrategy.stoppingStrategy,父actor仍然不会收到Terminated消息和程序仍然会挂在那里。
  • preRestart 调用 postStop,但它不发送 Terminated 消息。 doc.akka.io/docs/akka/snapshot/scala/…
  • 已经用一个经过测试的工作示例更新了我的答案。
  • 我试过你的代码,它可以工作,AKKA 团队博客中的代码也可以在联合测试中工作,但是在我把代码取出并放入 main() 函数后,它就挂在那里,父actor不会收到Terminated消息。你能修改你的代码以有一个main()入口函数吗?
  • 我添加了一个在应用程序中使用此代码的示例,该应用程序在使用 sbt 'run-main Main' 运行时对我有用
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-07-27
  • 1970-01-01
  • 2018-04-07
相关资源
最近更新 更多