【问题标题】:Akka, futures and critical sectionsAkka,期货和临界区
【发布时间】:2014-08-08 08:18:16
【问题描述】:

假设我们有一个 Akka actor,它以 var 的形式维护一个内部状态。

class FooActor extends Actor {
  private var state: Int = 0

  def receive = { ... }
}

假设接收处理程序调用一个返回未来的操作,我们使用调度程序将其映射为上下文执行程序,最后我们设置一个 onSuccess 回调来改变参与者状态。

import context.dispatcher
def receive = {
  case "Hello" => requestSomething() // asume Future[String]
    .map(_.size)
    .onSuccess { case i => state = i }
}

onSuccess 回调改变actor的状态是否是线程安全的,即使使用actor调度器作为执行上下文?

【问题讨论】:

    标签: scala concurrency akka future


    【解决方案1】:

    不,不是(akka 2.3.4 documentation)。

    在这种情况下,您要做的就是向自己发送一条消息以更改状态。如果您需要订购,您可以使用 stash 并成为。像这样的

    import akka.actor.{Stash,Actor}
    import akka.pattern.pipe
    case class StateUpdate(i:int)
    class FooActor extends Actor with Stash{
      private var state: Int = 0
      def receive = ready
      def ready  = {
        case "Hello" => requestSomething() // asume Future[String]
          .map(StateUpdate(_.size)) pipeTo self
          become(busy)
      } 
      def busy {
         case StateUpdate(i) => 
           state=i
           unstashAll()
           become(ready)
         case State.Failure(t:Throwable) => // the future failed
         case evt =>
           stash()   
      }
    }
    

    当然,这是一个简单的实现,您可能需要处理超时和一些东西以避免让您的演员卡住。

    如果您的州不需要订购保证:

    case class StateUpdate(i:int)
    class FooActor extends Actor with Stash{
      private var state: Int = 0
      def receive = {
        case "Hello" => requestSomething() // asume Future[String]
          .map(StateUpdate(_.size)) pipeTo self
        case StateUpdate(i) => state=i
      } 
    

    但是演员状态可能不是最后收到的字符串的长度

    【讨论】:

    • 非常感谢!我一直在查看文档,但没有看到该部分。
    【解决方案2】:

    为了支持 Jean 的回答,以下是文档中的示例:

    class MyActor extends Actor {
        var state = ...
        def receive = {
            case _ =>
            //Wrongs
    
            // Very bad, shared mutable state,
            // will break your application in weird ways
            Future {state = NewState}
            anotherActor ? message onSuccess {
                r => state = r
            }
    
            // Very bad, "sender" changes for every message,
            // shared mutable state bug
            Future {expensiveCalculation(sender())}
    
            //Rights
    
            // Completely safe, "self" is OK to close over
            // and it's an ActorRef, which is thread-safe
            Future {expensiveCalculation()} onComplete {
                f => self ! f.value.get
            }
    
            // Completely safe, we close over a fixed value
            // and it's an ActorRef, which is thread-safe
            val currentSender = sender()
            Future {expensiveCalculation(currentSender)}
        }
    }
    

    【讨论】:

    • 谢谢,我已经在我的回答中添加了相关链接:)
    猜你喜欢
    • 2015-07-07
    • 1970-01-01
    • 2011-10-05
    • 2013-04-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多