【问题标题】:how to use akka.actor.Status.Failure to catch the exception如何使用 akka.actor.Status.Failure 捕获异常
【发布时间】:2019-01-25 14:35:01
【问题描述】:

我有一个负责 MongoDB CRUD 操作的演员,它可能会抛出一个 mongoException 我想在我使用询问模式的调用代码中捕获这个异常,但我仍然得到一个 TimeOutException,这在我的案例来自我的文档over this link

警告 要完成异常,您需要向发件人发送 akka.actor.Status.Failure 消息。当参与者在处理消息时抛出异常时,这不会自动完成。

我按照文档中给出的代码 sn-p

这是我的代码

class test extends Actor {

def receive () {

case GetRecordLists=>
try {  
 //some operations here 
   sender ! resultList
}
catch {
  mongoEX:MongoException=>
  log.error("got mongodb exception",mongoex)
  sender ! akka.actor.Status.Failure(mongoEx)
  throw mongoEx

e:Exception=>
  log.error("got exception",e)
  sender ! akka.actor.Status.Failure(e)
  throw e
}

}

}


class MainClass extends App {

try {
     val future: Future[scala.collection.mutable.Set[String]] = ask(test, GetRecordLists).mapTo[scala.collection.mutable.Set[String]]
     val results = Await.result(future, timeout.duration)
    }
    catch  {
      case e:Exception=>log.error("got the exception in main class ",e)
      throw new Exception(e)
    }
}

这里的预期行为是捕获MongoException 但我得到了

java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) ~[scala-library-2.11.1.jar:na]
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) ~[scala-library-2.11.1.jar:na]
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:111) ~[scala-library-2.11.1.jar:na]
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) ~[scala-library-2.11.1.jar:na]
    at scala.concurrent.Await$.result(package.scala:111) ~[scala-library-2.11.1.jar:na]
    at MainClass(MainClass.scala:118) [xyz_2.11.jar:0.1.0-SNAPSHOT]

【问题讨论】:

    标签: scala akka


    【解决方案1】:

    你所做的,看起来是正确的。超时错误的唯一解释是对mongo的操作还没有完成,5 seconds还不够。

    请检查处理成功和失败以及超时情况的非常简化的代码。

    import akka.actor.{Actor, ActorSystem, Props}
    import akka.actor.Status.Failure
    import akka.util.Timeout
    import akka.pattern.ask
    
    import concurrent.duration._
    
    class TestActor(body: () => String) extends Actor {
      override def receive: Receive = {
        case msg => try {
          sender() ! body()
        } catch {
          case ex: Throwable =>
            sender() ! Failure(ex)
        }
      }
    }
    
    object TestApp extends App {
      val system = ActorSystem("test")
      import system.dispatcher
      implicit val timeout: Timeout = 1.second
      val actorOk = system.actorOf(Props(new TestActor(() => "Hello")))
      val actorNok = system.actorOf(Props(new TestActor(() => sys.error("Boom"))))
      val actorTimeout = system.actorOf(Props(new TestActor(() => {Thread.sleep(3000); ""})))
      (actorOk ? "some message").mapTo[String].onComplete(println)
      (actorNok ? "some message").mapTo[String].onComplete(println)
      (actorTimeout ? "some message").mapTo[String].onComplete(println)
      Thread.sleep(2000)
      system.terminate()
    }
    

    打印

    Success(Hello)
    Failure(java.lang.RuntimeException: Boom)
    Failure(akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://test/user/$c#-765807830]] after [1000 ms]. Message of type [java.lang.String]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-01-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-02-12
      • 1970-01-01
      相关资源
      最近更新 更多