【问题标题】:What is the best way to defer a Message?推迟消息的最佳方法是什么?
【发布时间】:2014-10-31 08:30:55
【问题描述】:

我有一个可以接收“getItems”消息的演员“ItemProvider”。 ItemProvider 管理项目的项目。所以我可以有几个“getItems”消息为项目 A 请求项目,而其他“getItems”消息为项目 B 请求项目。

“itemProvider”第一次收到这样的消息时,它需要调用服务来实际获取项目 (这可能需要一分钟服务返回一个未来,所以它不会阻止演员)。在此等待期间,其他“getItems”消息可能会到达。

项目“ItemProvider”缓存它从服务接收到的“Items”。 因此,在 1 分钟的加载时间之后,它可以立即提供商品。

我很确定“ItemProvider”应该使用 Akka 的成为功能。但是它应该如何处理它不能立即服务的客户呢?

我可以想到以下选项:

  1. ItemProvider 持有一个列表pendingMessages。并且它不能提供的消息被添加到这个列表中。当 ItemProvider “准备好”时,它将处理待处理的客户端

  2. ItemProvider 将消息发送回其父级。并且家长会重新发出消息

  3. ItemProvider 使用调度程序。并在未来再次得到消息。

  4. 也许不使用 become 而是使用 AbstractFSM 类?

    有人知道实现 ItemProvider 的最佳 Akka 方法吗?

【问题讨论】:

  • 我看起来可以为他们服务,因为您使用的是Future,但如果您在很短的时间内收到相同的请求,这可能是多余的工作。您需要一种方法来告诉后续请求您已经有了响应。所以,是的,您可能需要从请求到未来的Map,当有新请求到来时,请检查Map,如果该请求有Future,请发送Future也响应这个客户。
  • 我认为你不能使用become。当你有一个状态机时你需要become,并且你有多个状态机。每个单独的请求都有一个状态机,有两种状态:正在获取和已获取。
  • 我完全是 akka 的初学者。但我不明白为什么我不能将客户放在待处理列表中。当物品准备好时,Itemprovider 可以向所有等待的客户发送他们的结果。在我看来这是一个不错的选择
  • 啊,你也可以。当然。
  • @jack,如果您不介意它不在 java 中,我可以编写一个基于 scala 的解决方案来解决您的问题。如果你想看,请告诉我。

标签: java scala akka


【解决方案1】:

看看 Akka 的Stash feature (usage example)。 下面是(未经测试的)代码,用于在从服务器请求实际项目时存储 getItems 消息,然后在服务器请求完成后处理所有 getItems 消息

import akka.actor.{Actor, Stash}

class ItemProviderActor extends Actor with Stash {
  private[this] itemsOpt : Option[Items] = None

  def receive = processing

  def processing: Receive = {
    case m:GetItems => {
      if(itemsOpt.nonEmpty) {
        // respond immediately
        itemsOpt.foreach(sender() ! _)
      }
      else {
        // Stash current request and initiate cache update
        context.become(retrivingData)
        stash()

        // Will send future results of item retrieval as a message to self
        retrieveItems().pipeTo(self)
      }
    }
  }

  def retrivingData: Receive = {
    case m: Items => 

      // items are retrieved, update cache
      itemsOpt = Option(m)

      // resume normal processing
      context.become(processing)

      // put all pending item requests back to actor's message queue
      unstashAll()


    case m:GetItems => 
      // busy retrieving items, store request to serve later
      stash()
  }

  def retrieveItems() : Future[Items] = {
    ???
  }

}

【讨论】:

  • 这看起来很像我需要的。我明天试试!非常感谢
【解决方案2】:

您将在下面找到一种可能的方式来构建参与者以满足您的要求。在这个解决方案中,我将使用每个项目的参与者实例来缓存特定于该项目的项目。然后,我将使用一个路由actor,它将接收获取项目项目的请求并委托给处理该项目缓存的正确子actor。在实际的缓存参与者中,您会看到我使用 stash/unstash 来处理延迟请求,直到加载了要缓存的项目(我在代码中对此进行了模拟)。代码如下:

import akka.actor._
import scala.concurrent.Future
import akka.pattern._
import concurrent.duration._ 
import akka.util.Timeout

class ItemProviderRouter extends Actor{
  import ItemProvider._

  def receive = {
    case get @ GetItems(project) =>

      //Lookup the child for the supplied project.  If one does not
      //exist, create it
      val child = context.child(project).getOrElse(newChild(project))
      child.forward(get)
  }

  def newChild(project:String) = {
    println(s"creating a new child ItemProvider for project $project")
    context.actorOf(Props[ItemProvider], project)
  }

}

object ItemProvider{
  case class GetItems(project:String)
  case class Item(foo:String)
  case class LoadedItems(items:List[Item])
  case object ClearCachedItems
  case class ItemResults(items:List[Item])
}

class ItemProvider extends Actor with Stash{
  import ItemProvider._  

  //Scheduled job to drop the cached items and force a reload on subsequent request
  import context.dispatcher
  context.system.scheduler.schedule(5 minutes, 5 minutes, self, ClearCachedItems)

  def receive = noCachedItems

  def noCachedItems:Receive = {
    case GetItems(project) =>
      stash()      
      fetchItems(project)
      context.become(loadingItems)


    case ClearCachedItems =>
      //Noop
  }

  def loadingItems:Receive = {
    case get:GetItems => stash

    case LoadedItems(items) =>
      println(s"Actor ${self.path.name} got items to cache, changing state to cachedItems")
      context.become(cachedItems(items))
      unstashAll()    

    case ClearCachedItems => //Noop      
  }

  def cachedItems(items:List[Item]):Receive = {
    case GetItems(project) =>
      sender ! ItemResults(items)

    case ClearCachedItems =>
      println("Clearing out cached items")
      context.become(noCachedItems)       

    case other =>
      println(s"Received unexpected request $other when in state cachedItems")          
  }

  def fetchItems(project:String){
    println(s"Actor ${self.path.name} is fetching items to cache")

    //Simulating doing something that results in a Future
    //representing the items to cache    

    val fut = Future{
      Thread.sleep(5000)
      List(Item(s"hello $project"), Item(s"world $project"))
    }

    fut.map(LoadedItems(_)).pipeTo(self)
  }
}

然后进行测试:

object ItemProviderTest extends App{
  import ItemProvider._
  val system = ActorSystem("test")
  import system.dispatcher
  val provider = system.actorOf(Props[ItemProviderRouter])

  implicit val timeout = Timeout(10 seconds)
  for(i <- 1 until 20){
    val afut = provider ? GetItems("a")
    val bfut = provider ? GetItems("b")

    afut onSuccess{
      case ItemResults(items) => println(s"got items list of $items for project a")
    }

    bfut onSuccess{
      case ItemResults(items) => println(s"got items list of $items for project b")
    }    
  }
} 

为简单起见,我使用实际的 Actor 来执行路由,而不是自定义路由器,但如果性能(即邮箱命中)对您很重要,您也可以在此处实现自定义路由器。

【讨论】:

  • 将 fetchItems 放入 fetchItemsLoader actor 是否有意义?我会有所收获吗?
  • @jack,当然,如果您想将加载部分与缓存和服务部分分开,那么我不明白为什么不这样做。如果是这种情况,那么您可以在那里使用ask (?) 并像我在示例中所做的那样获取未来并通过管道返回自我,或者使用tell (!) 并直接等待响应而不需要未来。跨度>
【解决方案3】:

客户端应按计划重新发送幂等请求,直到收到满意的答案或超时。

是否需要更多的 ItemProvider 或 ItemProvider 批量请求取决于被查询资源的性质。如果您每分钟只能发出 1 个请求,则应在 ItemProvider 中对请求进行批处理。但是,客户有责任确保它继续请求答复,直到满意为止。它不应该依赖 ItemProvider 来可靠地记住请求。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-04-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-10-15
    • 1970-01-01
    • 1970-01-01
    • 2014-05-14
    相关资源
    最近更新 更多