【问题标题】:Retry on minor Exceptions for a long-living akka actor为长寿的 akka 演员重试次要例外
【发布时间】:2018-01-25 11:51:03
【问题描述】:

我有一个在应用程序启动时作为另一个actor 的子级创建的actor,并且每天从父级接收一条消息,以执行从某个SFTP 服务器获取一些文件的操作。

现在,可能有一些小的临时连接异常会导致操作失败。在这种情况下,需要重试。

但在某些情况下,可能会抛出异常并且不会在重试时解决(例如:找不到文件、某些配置不正确等)

因此,在这种情况下,考虑到参与者将在很长一段时间后(每天一次)收到消息,什么可能是合适的重试机制和监督策略。

在这种情况下,发送给actor的消息并不是错误的输入——它只是一个触发器。示例:

case object FileFetch

如果我在父母中有这样的监督策略,它将在每个次要/主要异常上重新启动失败的孩子而不重试。

override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.inf) {
    case _: Exception                => Restart
}

我想要的是这样的:

override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.inf) {
    case _: MinorException           => Retry same message 2, 3 times and then Restart
    case _: Exception                => Restart
}

【问题讨论】:

    标签: scala akka actor akka-supervision akka-actor


    【解决方案1】:

    “重试”或在发生异常时重新发送消息是您必须自己实现的。来自documentation

    如果在处理消息时抛出异常(即从其邮箱中取出并移交给当前行为),则该消息将丢失。重要的是要了解它不会放回邮箱。因此,如果您想重试处理消息,您需要通过捕获异常并重试您的流程来自己处理它。确保限制重试次数,因为您不希望系统活锁(因此会消耗大量 CPU 周期而没有进展)。

    如果您想在MinorException 事件中重新发送FileFetch 消息给孩子而不重新启动孩子,那么您可以在孩子中捕获异常以避免触发监督策略。在 try-catch 块中,您可以向父级发送消息并让父级跟踪重试次数(例如,如果您希望父级制定某种退避策略,则可能在此消息中包含时间戳) .在孩子:

    def receive = {
      case FileFetch =>
        try {
          ...
        } catch {
          case m: MinorException =>
            val now = System.nanoTime
            context.parent ! MinorIncident(self, now)
        }
      case ...
    } 
    

    在父级中:

    override val supervisorStrategy =
      OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.Inf) {
        case _: Exception => Restart
      }
    
    var numFetchRetries = 0
    
    def receive = {
      case MinorIncident(fetcherRef, time) =>
        log.error(s"${fetcherRef} threw a MinorException at ${time}")
        if (numFetchRetries < 3) { // possibly use the time in the retry logic; e.g., a backoff
          numFetchRetries = numFetchRetries + 1
          fetcherRef ! FileFetch
        } else {
          numFetchRetries = 0
          context.stop(fetcherRef)
          ... // recreate the child
        }
      case SomeMsgFromChildThatFetchSucceeded =>
        numFetchRetries = 0
      case ...
    }
    

    或者,您可以在发生MinorException 的情况下将主管策略设置为子级Resume,而不是捕获子级中的异常,同时仍然让父级处理消息重试逻辑:

    override val supervisorStrategy =
      OneForOneStrategy(maxNrOfRetries = -1, withinTimeRange = Duration.Inf) {
        case m: MinorException =>
          val child = sender()
          val now = System.nanoTime
          self ! MinorIncident(child, now)
          Resume
        case _: Exception => Restart
      }
    

    【讨论】:

      猜你喜欢
      • 2014-03-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-07-24
      • 1970-01-01
      • 1970-01-01
      • 2016-02-03
      • 2021-05-13
      相关资源
      最近更新 更多