【问题标题】:Interact with Akka actor outside actors与演员外的阿卡演员互动
【发布时间】:2013-05-19 20:01:36
【问题描述】:

我想通过我自己的线程与 Akka 演员互动。目前,我喜欢这样:

val res = Await.result(aref ? GroupReceive(fromRank), timeout.duration).asInstanceOf[T]

但我不确定这实际上是如何与我的线程交互的?我希望接收是异步的,即我想在接收时挂断线程以允许完成一些其他工作。我最近刚读到 Akka 收件箱系统。 inbox akka api

我想我记得 Await 每次都会创建一个新演员。 await+ask 和 inbox 有什么区别,谁能给我一个例子,说明如何创建一个收件箱并使用它与“外部”的演员交流?

编辑 澄清一下,我不希望同一个线程继续工作,我希望它停止占用 cpu 核心并让其他线程继续工作,直到它收到一些东西,然后再次醒来。

【问题讨论】:

  • 在akka中你最好使用mapTo[Type]而不是asInstanceOf
  • asInstanceOf 在失败时抛出异常。 mapTo 返回一个失败的未来。
  • 危险的是,asInstanceOf 在调用特定方法时实际上可能会成功并在稍后失败。
  • 是的,您可以阅读有关使用 akka 和 futures here
  • documentation中有一个例子。

标签: scala asynchronous akka actor inbox


【解决方案1】:

正如 Akka 的 Future 文档中所写,使用 Await 会阻塞当前线程,直到等待结果。

例子

import scala.concurrent.Await
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._

implicit val timeout = Timeout(5 seconds)
val future = actor ? msg // enabled by the “ask” import
val result = Await.result(future, timeout.duration).asInstanceOf[String]

这将导致当前线程阻塞并等待 Actor '完成' Future 的回复。

Use With Actors

【讨论】:

  • 能否详细说明这与使用收件箱的区别?
  • @Felix 抱歉从未使用过 akka 的 dsl,但我可以看到收件箱的主要用途是要求不能 1)接收多个回复和 2)观看其他演员的生命周期。 The Inbox
  • @senia 仍然不同意你的观点,要求通过 ActorSelection 返回带有 ActorRef 的 Future,但它不会创建新的 Actor
  • @Alexlv,如果您在 Actor 之外使用 ask ,则会在后台创建一个临时 Actor。这是因为为了做出响应,您需要通过 !操作员。 Actor 是短暂的,在收到响应或发生超时后停止。
  • 我如何将它与广播路由器一起使用,它应该为每个未来多次回复?谢谢。
【解决方案2】:

Await.receive 是 Scala 并发 API 的一部分,与参与者无关。它的目的是阻塞当前线程,直到提供的未来完成,或者超时限制开始,整个事情以超时异常结束。

ask 运算符? 确实会创建一个临时actor,其唯一目的是等待aref 变量指向的actor 的回复,并完成您调用ask 运算符时收到的future回复。

所以你的代码基本上阻塞了整个线程。如前所述,如果您想释放当前线程并继续做一些其他工作,您可以将回调附加到未来。

implicit val ctx: ExecutionContext = //provide execution context here
implicit val timeout: Timeout = // provide timeout here
aref ? GroupReceive(fromRank)) onSuccess { res =>
   //do something with res here, asynchronously
}
// some other code which runs without being blocked...

上面的代码可以用你上面提到的actor DSL重写:

import akka.actor.ActorDSL._
implicit val actorSystem: ActorSystem = // provide an actor system here or any actor ref factory

actor(new Act {
  aref ! GroupReceive(fromRank)
  context.setReceiveTimeout(timeout) //optional
  become {
    case ReceiveTimeout => {
      //handle the timeout
      context.stop(self)
    }
    case res => {
      //do your thing with res, asynchronously
      context.stop(self)
    }
  }
}

//some other code which won't wait for the above operations

后一个版本还创建了一个新的临时actor,它发送GroupReceive 消息,然后等待回复,之后它会杀死自己。

底线是,为了接收来自演员的消息,您自己必须是演员。 Actor 不能只向 ActorRef 以外的其他对象发送消息。

因此,您要么使用 ask 模式在幕后创建一个临时演员,并管理这个临时演员的生命周期本身,向您展示一个简单的美好未来,或者您可以自己创建临时演员,但随后您必须管理它的生命周期(即记住一旦它完成它的工作就杀死它)

选择最适合您的选项。

【讨论】:

  • 关于响应中的 DSL 示例部分:尽管这适用于更高延迟的请求,但对于非常快速的请求/响应,它可能会失败,因为 become 需要一些时间才能启动。应该正确初始化 DSL 操作喜欢在这个问题中始终有效:stackoverflow.com/questions/17851849/….
【解决方案3】:

如果您不想在调用方阻塞,则不要使用 Await,而是使用非阻塞回调,例如 onSuccess、onFailure 和 onComplete。当您执行此操作时,未来的任务将被放入询问(?)时范围内的任何 ExecutionContext 中。收到响应后,将通过 ExecutionContext 异步调用此回调。这样可以避免在向参与者发出请求的线程中一起阻塞,然后在与 ExecutionContext 相关联的线程池中处理回调。

此外,我相信您提到的收件箱内容旨在测试 REPL 中的演员内容(至少 ActorDsl 上的文档是这样说的)。坚持使用从演员外部询问的方法。让 akka 创建一个短暂的演员,它需要在后台进行非演员到演员调用的通信。然后按照我上面的建议切换到非阻塞回调。我相信这就是你要找的。

【讨论】:

    猜你喜欢
    • 2014-12-29
    • 2021-05-13
    • 2018-02-02
    • 1970-01-01
    • 2013-06-12
    • 2020-02-07
    • 2014-06-29
    • 1970-01-01
    • 2021-02-11
    相关资源
    最近更新 更多