【问题标题】:Child actors, futures and exceptions儿童演员、未来和例外
【发布时间】:2017-10-09 14:22:13
【问题描述】:

我目前正在开发一个带有注册流程的应用程序。此注册过程将在某些时候以异步方式与外部系统通信。为了使这个问题保持简洁,我将向您展示我写的两个重要演员:

SignupActor.scala

class SignupActor extends PersistentFSM[SignupActor.State, Data, DomainEvt] {
    private val apiActor = context.actorOf(ExternalAPIActor.props(new HttpClient))

    // At a certain point, a CreateUser(data) message is sent to the apiActor
}

ExternalAPIActor.scala

class ExternalAPIActor(apiClient: HttpClient) extends Actor {
    override def preRestart(reason: Throwable, message: Option[Any]) = {
        message.foreach(context.system.scheduler.scheduleOnce(3 seconds, self, _))
        super.preRestart(reason, message)
    }

    def receive: Receive = {
        case CreateUser(data) =>
            Await.result(
                apiClient.post(data)
                    .map(_ => UserCreatedInAPI())
                    .pipeTo(context.parent),
                Timeout(5 seconds).duration
            )
    }
}

此设置似乎按预期工作。当外部 API 出现问题(例如超时或网络问题)时,HttpClient::post 返回的Future 失败,并由于Await.result 而导致异常。这反过来又感谢SignupActor 父actor 的SupervisorStrategy,将重新启动ExternalAPIActor,我们可以在其中将最后一条消息重新发送给它自己,并稍有延迟以避免死锁。

我发现此设置存在一些问题:

  • ExternalAPIActorreceive 方法中,发生阻塞。据我了解,Actor 内的阻塞被认为是一种反模式。
  • 用于重新发送消息的延迟是静态的。如果 API 长时间不可用,我们将继续每 3 秒发送一次 HTTP 请求。我想在这里使用某种指数退避机制。

要继续使用后者,我在SignupActor 中尝试了以下操作:

SignupActor.scala

val supervisor = BackoffSupervisor.props(
    Backoff.onFailure(
      ExternalAPIActor.props(new HttpClient),
      childName = "external-api",
      minBackoff = 3 seconds,
      maxBackoff = 30 seconds,
      randomFactor = 0.2
    )
  )

private val apiActor = context.actorOf(supervisor)

不幸的是,这似乎根本没有做任何事情——ExternalAPIActorpreRestart 方法根本没有被调用。当用Backoff.onStop 替换Backoff.onFailure 时,会调用preRestart 方法,但根本没有任何指数退避。

鉴于以上情况,我的问题如下:

  • 是否使用Await.result 是推荐的(唯一的?)方法来确保捕获并相应地处理从actor 中调用的服务返回的Future 中抛出的异常?我的特定用例中一个特别重要的部分是消息不应该被丢弃,而是在出现问题时重试。或者是否有其他(惯用的)方式可以在 Actor 中处理异步上下文中抛出的异常?
  • 在这种情况下如何使用BackoffSupervisor?再次重申:非常重要的是,导致异常的消息不会被丢弃,而是重试 N 次(由 SupervisorStrategymaxRetries 参数确定。

【问题讨论】:

  • 我喜欢这个标题。

标签: scala asynchronous parallel-processing akka actor


【解决方案1】:

使用 Await.result 是推荐的(唯一的?)方法来确保 从内部调用的服务返回的 Future 中抛出的异常 演员是否被抓获并进行相应处理?

没有。通常,这不是您希望在 Akka 中处理故障的方式。更好的选择是将失败传递给您自己的演员,完全避免使用Await.result

def receive: Receive = {
  case CreateUser(data) =>
    apiClient.post(data)
      .map(_ => UserCreatedInAPI())
      .pipeTo(self)
  case Success(res) => context.parent ! res
  case Failure(e) => // Invoke retry here
}

这意味着处理失败不需要重新启动,它们都是你的演员正常流程的一部分。

解决此问题的另一种方法是创建“受监督的未来”。取自this blog post

object SupervisedPipe {

  case class SupervisedFailure(ex: Throwable)
  class SupervisedPipeableFuture[T](future: Future[T])(implicit executionContext: ExecutionContext) {
    // implicit failure recipient goes to self when used inside an actor
    def supervisedPipeTo(successRecipient: ActorRef)(implicit failureRecipient: ActorRef): Unit =
      future.andThen {
        case Success(result) => successRecipient ! result
        case Failure(ex) => failureRecipient ! SupervisedFailure(ex)
      }
  }

  implicit def supervisedPipeTo[T](future: Future[T])(implicit executionContext: ExecutionContext): SupervisedPipeableFuture[T] =
    new SupervisedPipeableFuture[T](future)

  /* `orElse` with the actor receive logic */
  val handleSupervisedFailure: Receive = {
    // just throw the exception and make the actor logic handle it
    case SupervisedFailure(ex) => throw ex
  }

  def supervised(receive: Receive): Receive = 
    handleSupervisedFailure orElse receive
}

这样,您只有在获得Failure 后才通过管道发送给自己,否则将其发送给要发送消息的演员,从而避免需要case Success 我添加到receive方法。您需要做的就是将supervisedPipeTo 替换为pipeTo 提供的原始框架。

【讨论】:

  • 感谢您的回答 Yuval。将结果返回给演员本身似乎是在走正确的道路。但是,在您的“受监督的未来”示例中,负责SupervisedFailure 消息的原始消息不会丢失吗?
  • 在第一个代码sn-p中的receive方法中,pipeTo(self)Future结果发送到self,所以你应该是模式匹配 UserCreatedInAPIakka.actor.Status.Failure 而不是 SuccessFailure
【解决方案2】:

好的,我做了更多的思考和修改,我想出了以下内容。

ExternalAPIActor.scala

class ExternalAPIActor(apiClient: HttpClient) extends Actor with Stash {
        import ExternalAPIActor._

        def receive: Receive = {
            case msg @ CreateUser(data) =>
                context.become(waitingForExternalServiceReceive(msg))
                apiClient.post(data)
                    .map(_ => UserCreatedInAPI())
                    .pipeTo(self)
        }

        def waitingForExternalServiceReceive(event: InputEvent): Receive = LoggingReceive {
            case Failure(_) =>
              unstashAll()
              context.unbecome()
              context.system.scheduler.scheduleOnce(3 seconds, self, event)

            case msg:OutputEvent =>
              unstashAll()
              context.unbecome()
              context.parent ! msg

            case _ => stash()
        }
}

object ExternalAPIActor {
    sealed trait InputEvent
    sealed trait OutputEvent

    final case class CreateUser(data: Map[String,Any]) extends InputEvent
    final case class UserCreatedInAPI() extends OutputEvent
}

我使用这种技术来防止原始消息丢失,以防我们调用的外部服务出现问题。在向外部服务请求的过程中,我切换上下文,等待失败的响应,然后再切换回来。感谢Stash trait,我可以确保对外部服务的其他请求也不会丢失。

由于我的应用程序中有多个参与者调用外部服务,因此我将 waitingForExternalServiceReceive 抽象为它自己的特征:

WaitingForExternalService.scala

trait WaitingForExternalServiceReceive[-tInput, +tOutput] extends Stash {

  def waitingForExternalServiceReceive(event: tInput)(implicit ec: ExecutionContext): Receive = LoggingReceive {
    case akka.actor.Status.Failure(_) =>
      unstashAll()
      context.unbecome()
      context.system.scheduler.scheduleOnce(3 seconds, self, event)

    case msg:tOutput =>
      unstashAll()
      context.unbecome()
      context.parent ! msg

    case _ => stash()
  }
}

现在,ExternalAPIActor 可以扩展这个特性:

ExternalAPIActor.scala

class ExternalAPIActor(apiClient: HttpClient) extends Actor with WaitingForExternalServiceReceive[InputEvent,OutputEvent] {
        import ExternalAPIActor._

        def receive: Receive = {
            case msg @ CreateUser(data) =>
                context.become(waitingForExternalServiceReceive(msg))
                apiClient.post(data)
                    .map(_ => UserCreatedInAPI())
                    .pipeTo(self)
        }
}

object ExternalAPIActor {
    sealed trait InputEvent
    sealed trait OutputEvent

    final case class CreateUser(data: Map[String,Any]) extends InputEvent
    final case class UserCreatedInAPI() extends OutputEvent
}

现在,如果出现故障/错误,actor 将不会重新启动,并且消息不会丢失。更重要的是,现在actor的整个流程是非阻塞的。

这个设置(很可能)远非完美,但它似乎完全按照我的需要工作。

【讨论】:

    猜你喜欢
    • 2015-05-06
    • 2015-10-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多