【问题标题】:how to use future value inside an akka actor如何在 akka 演员中使用未来价值
【发布时间】:2021-01-14 21:30:58
【问题描述】:

我想在父子层次结构中使用 akka 演员,父母可以将最终计算结果发送到可能/可能不是演员的调用代码这是我的用例

parent->child->childManager

childManager 有三个工人演员(3 个孩子) childManager (worker1,worker2,worker3)

现在worker1获取值处理它并将其响应值发送给worker2

  • worker1->处理值并将结果发送给worker2
  • worker2->处理worker1结果并将worker2结果发送给worker3
  • worker3->处理worker2结果并将最终计算值发送到 子管理器
  • childManager 将结果发送给孩子
  • 孩子将结果发送给父母

这就是我想要做的事情

final case class GetValue(value:String)
final case class sendToChildManager(value:String)
final case class sendToChild(value:String)

final case class processToken(value:String)
final case class processPublicKey(value:String)
final case class processUserDeatils(accessToken:string,publicKey:PublicKey)

 class worker1 extends Actor {
  def receive {
    case ProcessToken(token)=>
    val request = httpRequest(token)
    request.onComplete{
      case Success(accessToken)=>
        //generate access token 
        sender ! accessToken
      case Failure(e)=>throw e
   }
  }
}

class worker2 extends Actor {
  def receive {
    case processPublicKey(token)=>
    val request=HttpRequest(token)//based on accessToken compute publicKey 
    request.onComplete {
      case Suceess(publicKey)=>
        // calculate publicKey from accessToken 
        sender ! publicKey
      case Failure(e)=>throw e
    }
  }
}

class worker3 extends Actor {

  def receive {
    case processUserDeatils(accessToken,publicKey)=>
    val request = HttpRequest(accessToken,publicKey)
    request.onCompelete {
      case Success(value)=>
        //process the resulted value got userInfo as a result 
        sender ! UserInfo
      case Failure(e)=> throw e
    }
  }
}

class ChildManager extends Actor {

  def receive {
    case sendtoChildManager(value)=>
    val worker1=context.actorOf//
    val worker2=context.actorOf//
    val worker3=context.actorOf//

    val futureToken = ask(worker1,processToken)
    futureToken.onComplete {
      case Sucess(token)=>
        val futurePublickKey = ask(worker2,processPublicKey(token))
        futurePublickKey.onComplete {
          case Sucess(publicKey)=>
            val futureVerified=ask(worker3,processUserDeatils(token,publicKey))
            futureVerified.pipeTo(sender)
          case Failure(e)=> throw e
        }
      case Failure(e)=>throw e
    }
  }
}

 class child extends Actor {
  val childMnager = context.actorOf//
  def receive {
    case sendToChild(value)=>
    childMnager.forward(sendtoChildManager(value))
  }

  class parent extends Actor {
    val child = context.actorOf()//

    def receive {
      case Getvalue(value)=>
      child.forward(sendtoChild(value))
    }
  }
}

object Main {
  def main {
    val system =ActorSystem("myActorsystem")
    val parent=system.actorOf(Props[Parent],"parent")
    val future=ask(parent,GetValue("token-id"))
    future.onComplete {
      case Success(result)=>Complete("user information is",result)
      case Failure(e) Complete("InternelserverError",e)
    }
  }
}

我已经读到不建议在演员内部使用 onComplete 它应该是发送者的 pipeTo

警告 当使用未来的回调,如 onComplete,或 map,如 thenRun,或 thenApply 内部 Actor 时,您需要小心避免关闭包含 Actor 的引用,即不要从回调中调用方法或访问封闭 Actor 上的可变状态。这会破坏actor封装,并可能引入同步错误和竞争条件,因为回调将同时调度到封闭的actor。不幸的是,目前还没有一种方法可以在编译时检测这些非法​​访问。另请参阅:Actors 和共享可变状态警告 当使用未来的回调,如 onComplete,或 map,如 thenRun,或 thenApply 内部 Actor 时,您需要小心避免关闭包含 Actor 的引用,即不要从回调中调用方法或访问封闭 Actor 上的可变状态。这会破坏actor封装,并可能引入同步错误和竞争条件,因为回调将同时调度到封闭的actor。不幸的是,目前还没有一种方法可以在编译时检测这些非法​​访问。另请参阅:Actor 和共享可变状态 https://doc.akka.io/docs/akka/current/actors.html#ask-send-and-receive-future

但我想在一个actor内部进行所有处理,以便父actor将最终计算值返回给调用代码 我怎样才能做到这一点?
我正在使用 akka http

**Edit**

我的问题是我如何使用未来内部演员,因为它建议不要使用 .map 或 onComplete 或等待,我不想将多个未来发送到调用代码然后计算我想要发送的最终结果调用代码的最终计算结果

【问题讨论】:

  • 这个问题和你问的previous有什么区别?
  • 这很难阅读并且有错误(例如“案例成功”)。请以适当的格式发布工作代码。
  • @Tim 我已经重新格式化了代码

标签: scala akka future akka-http


【解决方案1】:

如果您在 actor 内部使用异步 api,请不要使用 'sender' 属性。只要您在接收函数中编写同步代码,参与者就会提供互斥。但是,当您创建未来时,您会创建一个可以在另一个线程中执行的新异步任务,同时消息仍在涌入演员邮箱并且“发件人”属性发生变化。

你有两个选择:

  1. 使用 Akka Typed。自从 Akka Typed 准备好投入生产以来,Akka 已经有了很大的改进。我推荐这个选项。使用 Akka Typed,您必须在消息的数据类型中包含引用。

  2. 为每条执行 Http 请求的消息创建一个匿名 Actor,并保存原始发送者 Actor 引用。

【讨论】:

  • 我的问题是我如何使用未来内部演员,因为它建议不要使用 .map 或 onComplete 或等待,我不想将多个未来发送到调用代码然后计算最终结果结果我想将最终计算的结果发送给调用代码
  • 看看这个:jaxenter.com/…,虽然老了,但是经典的actors api没变。
  • 不确定此建议来自何处,但只要您不访问Future 内的外部状态(如sender)就可以。最好的办法是只发送一条消息并在 receive 方法中同步完成处理,这似乎是您已经在做的事情。
  • 它来自有效 Akka 书的作者 jamie Allen。他描述的问题是相同的,尽管情况是一个演员必须调用另一个演员。它可以外推到未来的情况
  • 谁能提供一个正确的例子
【解决方案2】:

您不能在Future 中使用sender,因为它是可变的Actor 状态。 sender 的值可以在每次调用 receive 方法时更改,并且在 Future 完成时可能会有所不同。

解决方案是捕获sender 的值并将捕获的值用于Future

def receive = {
  case ProcessToken(token)=>
    val replyTo = sender // Capture current value of sender
    val request = httpRequest(token)

    request.onComplete{
      case Success(accessToken)=>
        //generate access token 
        replyTo ! accessToken
      case Failure(e) =>
        throw e
  }
}

类型化的actors不支持sender,所以ProcessToken消息本身需要有一个replyTo字段。此消息不是参与者状态,可以在 Future 内安全访问。

【讨论】:

  • 我应该继续在演员内部使用期货怎么样?在演员内部
  • 这是我的实际问题,而不是发件人参考
  • 是的,你应该继续使用onComplete,就像我在这个答案中所做的那样。
猜你喜欢
  • 2020-05-01
  • 2013-06-16
  • 1970-01-01
  • 2012-11-28
  • 2011-11-17
  • 1970-01-01
  • 1970-01-01
  • 2011-07-08
  • 2018-06-15
相关资源
最近更新 更多