【问题标题】:Initializing an actor before being able to handle some other messages在能够处理一些其他消息之前初始化一个actor
【发布时间】:2013-12-16 00:08:00
【问题描述】:

我有一个演员可以创造另一个演员:

class MyActor1 extends Actor {
  val a2 = system actorOf Props(new MyActor(123))
}

第二个参与者必须在创建后自行初始化(引导),然后才能执行其他工作。

class MyActor2(a: Int) extends Actor {
  //initialized (bootstrapped) itself, potentially a long operation 
  //how?
  val initValue = // get from a server

  //handle incoming messages
  def receive = {
    case "job1" => // do some job but after it's initialized (bootstrapped) itself
  }
}

所以MyActor2 必须做的第一件事就是做一些初始化工作。这可能需要一些时间,因为它是对服务器的请求。只有在它成功完成后,它才必须能够处理通过receive 传入的消息。在此之前 - 它不能那样做。

当然,对服务器的请求必须是异步的(最好使用Future,而不是asyncawait 或其他高级别的东西,如AsyncHttpClient)。我知道如何使用 Future,不过这不是问题。

我如何确保这一点?

p.s.我的猜测是它必须先向自己发送消息。

【问题讨论】:

    标签: scala asynchronous akka future


    【解决方案1】:

    你可以在初始化后使用become方法来改变actor的行为:

    class MyActor2(a: Int) extends Actor {
    
      server ! GetInitializationData
    
      def initialize(d: InitializationData) = ???
    
      //handle incoming messages
      val initialized: Receive = {
        case "job1" => // do some job but after it's initialized (bootstrapped) itself
      }
    
      def receive = {
        case d @ InitializationData =>
          initialize(d)
          context become initialized
      }
    }
    

    请注意,此类参与者将在初始化之前丢弃所有消息。您必须手动保存这些消息,例如使用Stash

    class MyActor2(a: Int) extends Actor with Stash {
    
      ...
    
      def receive = {
        case d @ InitializationData =>
          initialize(d)
          unstashAll()
          context become initialized
        case _ => stash()
      }
    }
    

    如果您不想使用 var 进行初始化,您可以使用 InitializationData 创建初始化行为,如下所示:

    class MyActor2(a: Int) extends Actor {
    
      server ! GetInitializationData
    
      //handle incoming messages
      def initialized(intValue: Int, strValue: String): Receive = {
        case "job1" => // use `intValue` and `strValue` here
      }
    
      def receive = {
        case InitializationData(intValue, strValue) =>
          context become initialized(intValue, strValue)
      }
    }
    

    【讨论】:

    • 对不起,我忘了说我需要用初始化结果初始化val initValue =。所以,最终initValue 必须保存初始化过程的结果。最好的方法是什么?
    • @Alex:查看更新。您可以在没有var 的情况下多次更改行为 - 请参阅this answer
    • 如果我想使用var,那么它最初会具有默认(空)值,然后会被定义?
    • 不应该初始化返回一个Future?否则会阻塞线程。
    • @Alex: @var: 是的,它将是var intValue: Int = _,但我更喜欢没有vars 的解决方案。 @Future:不,def initialized 立即返回,这里不需要Future(实际上这里没有办法使用)。
    【解决方案2】:

    我不知道建议的解决方案是否是个好主意。 发送初始化消息对我来说似乎很尴尬。Actor 有一个生命周期并提供了一些钩子。当您查看API 时,您会发现prestart 挂钩。

    因此我提出以下建议:

    • 创建 Actor 后,将运行其 preStart 挂钩,您可以在其中执行返回未来的服务器请求。
    • 虽然未来尚未完成,但所有传入的消息都会被隐藏。
    • 未来完成时,它使用 context.become 来使用您的真实/正常接收方法。
    • 在成为之后,您可以解开所有内容。

    这是代码的粗略草图(不好的解决方案,请参阅下面的真正解决方案):

    class MyActor2(a: Int) extends Actor with Stash{
    
      def preStart = {
        val future = // do your necessary server request (should return a future)
        future onSuccess {
          context.become(normalReceive)
          unstash()
        }
      }
    
      def receive = initialReceive
    
      def initialReceive = {
        case _ => stash()
      }
    
      def normalReceive = {
        // your normal Receive Logic
      }
    }
    

    更新:根据 Senias 的反馈改进了解决方案

    class MyActor2(a: Int) extends Actor with Stash{
    
      def preStart = {
        val future = // do your necessary server request (should return a future)
        future onSuccess {
          self ! InitializationDone
        }
      }
    
      def receive = initialReceive
    
      def initialReceive = {
        case InitializationDone =>
          context.become(normalReceive)
          unstash()
        case _ => stash()
      }
    
      def normalReceive = {
        // your normal Receive Logic
      }
    
      case class InitializationDone
    }
    

    【讨论】:

    • 抱歉,这是一个糟糕的解决方案。 onSuccess 将在与您的演员无关的某个线程中执行。您将在stash()unstash() 上获得竞争条件。永远不要分享演员的状态!它破坏了线程安全。改变actor状态的唯一有效方法是消息处理。
    • 哦,是的,你是对的。谢谢!我对演员还是新手,有时会忘记一些基本的事情:-)。在改进的解决方案中,我现在在 onSuccess 中发送一条消息,并在接收中执行成为和取消存储。
    • 你能解释一下为什么你使用context.become(normalReceive)然后case _ => stash()。在become 之后,不是所有消息都会转到normalReceive 吗?他们中的任何人都会到达case _ => stash()吗?
    • 好的。所以演员被创造出来。在运行 preStart 并执行服务器请求。此时,actor 使用 initialReceive 方法,这意味着调用消息将被隐藏,因为它们匹配最后一个子句。然后一段时间后,future 完成并发送 InitializationDone 消息。然后演员切换到 normalReceive 并进行 unstash。现在所有“保存”的消息都将由您的 normalReceive 处理。
    • this comment。我想修复后你会从this comment 得到sendRequestToServer().onSuccess{ resp => self ! InitializationData(resp) }
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-12-27
    • 1970-01-01
    • 2017-02-23
    • 2021-05-17
    • 2023-04-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多