【问题标题】:akka- how to ensure all responses of dynamic number of actors are returned to parent actor?akka-如何确保将动态数量的参与者的所有响应返回给父参与者?
【发布时间】:2018-11-21 05:29:54
【问题描述】:

每次我的程序启动时,我都需要创建可变数量的参与者,然后必须确保在一段时间后返回所有响应。这个 link 为固定数量的演员提供了一个好主意,但动态数量呢?

这是我创建actor并将消息传递给它们的代码:

   ruleList = ...
   val childActorList: Iterable[ActorRef] = ruleList.map(ruleItem =>
    context.actorOf(DbActor.props(ruleItem.parameter1, ruleItem.parameter2)))

  implicit val timeout = Timeout(10.second)
  childActorList.foreach(childActor =>
    childActor ? (tempTableName, lastDate)
  )

更新-1

根据@Raman Mishra guides,我更新了我的代码如下,这是父actor中的代码:

override val supervisorStrategy: SupervisorStrategy = {
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 10 seconds) {
  case exp: SQLException => //Resume;
   throw exp
  case exp:AskTimeoutException =>  throw exp
  case other: Exception => throw other
 }
}

override def receive: Receive = {

case Start(tempTableName, lastDate) => {

implicit val timeout = Timeout(10.second)
ruleList.foreach { ruleItem =>
    val childActor = context.actorOf(DbActor.props(ruleItem._1, query = ruleItem._2))
    ask(childActor, (tempTableName, lastDate)).mapTo[Seq[Int]] 
  onComplete {
  lastDate)).mapTo[Seq[Int]] onComplete {
      case util.Success(res) => println("done" + res + ruleItem._2)
      case util.Failure(exp: AskTimeoutException) => println("Failed query:" + ruleItem._2); throw exp
      case other => println(other)
    }
  }

在儿童演员中:

  case (brokerTableName, lastDate) => {
    Logger("Started query by actor" + self.path.name + ':' + 
  val repo = new Db()
  val res = repo.getAggResult(query = (brokerTableName, lastDate))

  val resWrapper = res match {
    case elem: Future[Any] => elem
    case elem:Any => Future(elem)
  }
  resWrapper pipeTo self
}
case res:List[Map[Any, Any]] => {
  // here final result is send to parent actor
  repo.insertAggresults(res, aggTableName) pipeTo context.parent
}

现在,每当我运行主应用程序时,首先,父演员启动并创建子演员并使用 ask 方法向他们发送消息。子actor执行他们的任务,但这里的问题是子actor响应永远不会返回给父actor,并且在每次运行应用程序时,都会出现AskTimeoutException。我怀疑onComplete 方法的使用是否正确。任何帮助将不胜感激。

“Updated-2”

我发现问题在于使用context.parent 而不是sender()。另外,当我将结果的第一部分发送给发件人,并且发件人要求第二部分时,问题已解决,但我不知道这里发生了什么,为什么我不能通过管道发送给自己并将最终结果返回给父母?

这是最后一个代码:

在父演员中:

    override def receive: Receive = {

case Start(tempTableName, lastDate) => {
  println("started: called by remote actor")
  implicit val timeout = Timeout(5 second)
  ruleList.foreach { ruleItem =>
    val childActor = context.actorOf(DbActor.props(ruleItem._1, query = ruleItem._2))
    ask(childActor, Broker(tempTableName, lastDate)) onComplete {
      //        (childActor ? Broker(tempTableName, lastDate)).mapTo[Seq[Int]] onComplete {
      case util.Success(res: List[Map[Any, Any]]) => (childActor ? res) onComplete {
        case util.Success(res: Seq[Any]) => println("Successfull- Num,ber of documents:" + res.length + " " + ruleItem._2)
        case util.Failure(exp: AskTimeoutException) => println("Failed for writing - query:" + ruleItem._2); throw exp
      }
      case util.Failure(exp: AskTimeoutException) => println("Failed for reading - query :" + ruleItem._2); throw exp
      case other => println(other)
     }
   }

 }

}

在儿童演员中:

  case (brokerTableName, lastDate) => {
    Logger("Started query by actor" + self.path.name + ':' + 
  val repo = new Db()
  val res = repo.getAggResult(query = (brokerTableName, lastDate))

  val resWrapper = res match {
    case elem: Future[Any] => elem
    case elem:Any => Future(elem)
  }
  resWrapper pipeTo sender()
}
case res:List[Map[Any, Any]] => {
  // here final result is send to parent actor
  repo.insertAggresults(res, aggTableName) pipeTo sender()
}

【问题讨论】:

  • 你能显示你的孩子演员的接收方法吗?在那里你需要使用 pipeTo sender 将响应发送回询问参与者
  • Thnx,我用的是同一个pipeTo,如果有些结果永远没有完成怎么办?
  • 您将获得 akkaAskTimeoutException,然后您可以使用监督策略处理该异常。
  • 感谢您非常早的回复,如果返回的结果不是未来类型,我不能使用 pipeTo 方法进行数据库查询的实例呢?据我所知,此方法用于将来的响应
  • 您能否用所有这些疑问更新您的问题或再问一个。您可以简单地将您的响应包装到没有问题的未来

标签: scala akka


【解决方案1】:

回复sender() 有效而回复context.parent 无效的原因是 ask 创建了一个临时参与者来处理响应。您需要回复这个临时演员:发件人(与父母不同)。

还不清楚getAggResult 方法是否阻塞。如果是这样,这将无济于事(请参阅here)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-12-02
    • 2015-09-04
    • 2011-07-26
    相关资源
    最近更新 更多