【问题标题】:Handling Faults in Akka actors处理 Akka actor 中的错误
【发布时间】:2014-05-31 07:36:32
【问题描述】:

我有一个非常简单的示例,其中我有一个 Actor (SimpleActor),它通过向自身发送消息来执行周期性任务。消息在actor的构造函数中调度。在正常情况下(即没有故障)一切正常。

但是如果 Actor 必须处理错误怎么办。我还有另一个演员 (SimpleActorWithFault)。这个演员可能有缺点。在这种情况下,我自己通过抛出异常来生成一个。当发生故障时(即SimpleActorWithFault 抛出异常),它会自动重新启动。然而,这个重新启动会弄乱 Actor 内部的调度程序,它不再作为例外工作。如果故障发生得足够快,它会产生更多意想不到的行为。

我的问题是在这种情况下处理错误的首选方法是什么?我知道我可以使用Try 块来处理异常。但是,如果我正在扩展另一个无法在超类中放置 Try 的演员,或者当我是演员的例外错误时,该怎么办。

import akka.actor.{Props, ActorLogging}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import akka.actor.Actor

case object MessageA

case object MessageToSelf


class SimpleActor extends Actor with ActorLogging {

  //schedule a message to self every second
  context.system.scheduler.schedule(0 seconds, 1 seconds, self, MessageToSelf)

  //keeps track of some internal state
  var count: Int = 0

  def receive: Receive = {
    case MessageA => {
      log.info("[SimpleActor] Got MessageA at %d".format(count))
    }
    case MessageToSelf => {
      //update state and tell the world about its current state 
      count = count + 1
      log.info("[SimpleActor] Got scheduled message at %d".format(count))

    }
  }

}


class SimpleActorWithFault extends Actor with ActorLogging {

  //schedule a message to self every second
  context.system.scheduler.schedule(0 seconds, 1 seconds, self, MessageToSelf)

  var count: Int = 0

  def receive: Receive = {
    case MessageA => {
      log.info("[SimpleActorWithFault] Got MessageA at %d".format(count))
    }
    case MessageToSelf => {
      count = count + 1
      log.info("[SimpleActorWithFault] Got scheduled message at %d".format(count))

      //at some point generate a fault
      if (count > 5) {
        log.info("[SimpleActorWithFault] Going to throw an exception now %d".format(count))
        throw new Exception("Excepttttttiooooooon")
      }
    }
  }

}


object MainApp extends App {
  implicit val akkaSystem = akka.actor.ActorSystem()
  //Run the Actor without any faults or exceptions 
  akkaSystem.actorOf(Props(classOf[SimpleActor]))

  //comment the above line and uncomment the following to run the actor with faults  
  //akkaSystem.actorOf(Props(classOf[SimpleActorWithFault]))

}

【问题讨论】:

    标签: scala akka fault-tolerance error-kernel


    【解决方案1】:

    正确的方法是将冒险行为推到它自己的演员身上。这种模式称为错误内核模式(参见 Akka 并发,第 8.5 节):

    此模式描述了一种非常常识性的监督方法 根据任何易失性将参与者彼此区分开来 声明他们可能持有。

    简而言之,就是状态珍贵的演员不应该 允许失败或重新启动。任何持有宝贵数据的演员都是 受到保护,任何有风险的操作都被归为从属设备 演员,如果重新启动,只会让好事发生。

    错误内核模式意味着进一步降低风险水平 树。

    另见另一个tutorial here

    所以在你的情况下,它会是这样的:

    SimpleActor 
     |- ActorWithFault
    

    这里SimpleActor 充当ActorWithFault主管any actor 的默认监督策略是在 Exception 上重新启动一个子进程,并在其他任何事件上升级: http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html

    升级意味着参与者本身可能会重新启动。由于您真的不想重新启动SimpleActor,您可以让它始终重新启动ActorWithFault,并且永远不会通过覆盖主管策略来升级:

    class SimpleActor {
      override def preStart(){
        // our faulty actor --- we will supervise it from now on
        context.actorOf(Props[ActorWithFault], "FaultyActor") 
      ...
    
      override val supervisorStrategy = OneForOneStrategy () {
        case _: ActorKilledException => Escalate
        case _: ActorInitializationException => Escalate
        case _ => Restart // keep restarting faulty actor
      }
    
    }
    

    【讨论】:

    【解决方案2】:

    为避免搞乱调度程序:

    class SimpleActor extends Actor with ActorLogging {
    
      private var cancellable: Option[Cancellable] = None
    
      override def preStart() = {
        //schedule a message to self every second
        cancellable = Option(context.system.scheduler.schedule(0 seconds, 1 seconds, self, MessageToSelf))
      }
    
      override def postStop() = {
        cancellable.foreach(_.cancel())
        cancellable = None
      }
    ...
    

    正确处理异常(akka.actor.Status.Failure 是为了在发件人使用 Ask 模式的情况下正确回答问题):

    ...
    def receive: Receive = {
        case MessageA => {
          try {
            log.info("[SimpleActor] Got MessageA at %d".format(count))
          } catch {
            case e: Exception =>
              sender ! akka.actor.Status.Failure(e)
              log.error(e.getMessage, e)
          }
        }
    ...
    

    【讨论】:

    • 不,这不是 Actor 处理故障的方式:您将 Actor 的内部故障给客户端带来负担。相反,异常需要上报给参与者的主管并在该级别进行处理。
    • 是的,我同意。但在某些情况下,我们需要立即通知客户究竟是什么失败了。否则,它总是会以客户端的 AskTimeoutException 结束。
    猜你喜欢
    • 1970-01-01
    • 2016-05-28
    • 1970-01-01
    • 1970-01-01
    • 2023-03-19
    • 2015-06-01
    • 1970-01-01
    • 2023-04-02
    • 2021-11-24
    相关资源
    最近更新 更多