【发布时间】:2017-10-09 14:22:13
【问题描述】:
我目前正在开发一个带有注册流程的应用程序。此注册过程将在某些时候以异步方式与外部系统通信。为了使这个问题保持简洁,我将向您展示我写的两个重要演员:
SignupActor.scala
class SignupActor extends PersistentFSM[SignupActor.State, Data, DomainEvt] {
private val apiActor = context.actorOf(ExternalAPIActor.props(new HttpClient))
// At a certain point, a CreateUser(data) message is sent to the apiActor
}
ExternalAPIActor.scala
class ExternalAPIActor(apiClient: HttpClient) extends Actor {
override def preRestart(reason: Throwable, message: Option[Any]) = {
message.foreach(context.system.scheduler.scheduleOnce(3 seconds, self, _))
super.preRestart(reason, message)
}
def receive: Receive = {
case CreateUser(data) =>
Await.result(
apiClient.post(data)
.map(_ => UserCreatedInAPI())
.pipeTo(context.parent),
Timeout(5 seconds).duration
)
}
}
此设置似乎按预期工作。当外部 API 出现问题(例如超时或网络问题)时,HttpClient::post 返回的Future 失败,并由于Await.result 而导致异常。这反过来又感谢SignupActor 父actor 的SupervisorStrategy,将重新启动ExternalAPIActor,我们可以在其中将最后一条消息重新发送给它自己,并稍有延迟以避免死锁。
我发现此设置存在一些问题:
- 在
ExternalAPIActor的receive方法中,发生阻塞。据我了解,Actor 内的阻塞被认为是一种反模式。 - 用于重新发送消息的延迟是静态的。如果 API 长时间不可用,我们将继续每 3 秒发送一次 HTTP 请求。我想在这里使用某种指数退避机制。
要继续使用后者,我在SignupActor 中尝试了以下操作:
SignupActor.scala
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
ExternalAPIActor.props(new HttpClient),
childName = "external-api",
minBackoff = 3 seconds,
maxBackoff = 30 seconds,
randomFactor = 0.2
)
)
private val apiActor = context.actorOf(supervisor)
不幸的是,这似乎根本没有做任何事情——ExternalAPIActor 的 preRestart 方法根本没有被调用。当用Backoff.onStop 替换Backoff.onFailure 时,会调用preRestart 方法,但根本没有任何指数退避。
鉴于以上情况,我的问题如下:
- 是否使用
Await.result是推荐的(唯一的?)方法来确保捕获并相应地处理从actor 中调用的服务返回的Future中抛出的异常?我的特定用例中一个特别重要的部分是消息不应该被丢弃,而是在出现问题时重试。或者是否有其他(惯用的)方式可以在 Actor 中处理异步上下文中抛出的异常? - 在这种情况下如何使用
BackoffSupervisor?再次重申:非常重要的是,导致异常的消息不会被丢弃,而是重试 N 次(由SupervisorStrategy的maxRetries参数确定。
【问题讨论】:
-
我喜欢这个标题。
标签: scala asynchronous parallel-processing akka actor