【问题标题】:How to solve this with Akka actors?如何用 Akka 演员解决这个问题?
【发布时间】:2015-10-25 10:23:20
【问题描述】:

不知道如何命名此线程,但将尝试用几行来解释问题。

我有一个命令需要计算所需日期范围的价格。为了计算它,系统需要单独获取每一天的价格(数据库、配置、缓存,从哪里来并不重要)。

我的建议是有一个 PriceRangeActor,它有一个 DailyPriceActor 池,并会向它们发送诸如 CalculateDailyPrice 之类的命令。

但是如何在 PriceRanceActor 中组装所有这些数据?

1。 有一些带有复杂键的大地图闻起来很香。那么如何确定范围是否完全计算?有没有更简单的方法?

2。 为每个命令创建新的 PriceRangeActor 并使用 ask pattern 查询 DailyPriceActors 列表?

【问题讨论】:

  • 我会选择第二个选项。在这种情况下,未来的组合比显式消息交换要容易得多。
  • 你为什么不创建一个演员链,每个演员都有每日价格,然后你沿着链条传递你的信息,每个演员都添加到结果中,最后一个 - 返回返回发件人 :-) 可能取决于范围有多大。
  • 谢谢各位。 @kukido 但那样我将失去并行计算的可能性。
  • 它看起来类似于书中有效的akka​​中的cameo pattern解决的问题。基本上都准备好后调用N个服务并聚合结果。我认为值得一看。

标签: akka reactive-programming


【解决方案1】:

因为您没有使用任何消息传递/排队,所以我建议您使用 Futures 而不是 Actors 作为您的并发抽象机制。 blog entry 提出了一个非常有说服力的论点,即 Actor 用于状态,Futures 用于计算。

使用 Future 或 Actor ?(这是一个 Future),您可以使用 Future.sequence 将所有单独的查询 Future 捆绑到一个 Future 中,该 Future 仅在所有子查询完成后才完成。

使用期货(推荐)

import scala.concurrent.Future

object Foo extends App {

  type Date = Int
  type Prices = Seq[Float]
  type PriceMap = Map[Date, Prices]

  //expensive query function
  def fetchPrices(date : Date) : Prices = ???

  //the Dates to query Prices for
  val datesToQuery : Seq[Date] = ???

  import scala.concurrent.ExecutionContext.Implicits._

  def concurrentQuery(date : Date) : Future[Prices] = Future {fetchPrices(date)}      

  //launches a Future per date query, D Dates => D Futures
  //Future.sequence converts the D Futures into 1 Future
  val dates2PricesFuture : Future[PriceMap] = 
    Future.sequence(datesToQuery map concurrentQuery)
          .map(datesToQuery zip _)  
          .map(_.toMap)      

  dates2PricesFuture onSuccess { case priceMap : PriceMap =>
    //process the price data which is now completely available
  }

}//end object Foo

使用演员

import scala.concurrent.Future
import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout

object Foo extends App {          
  type Date = Int
  type Prices = Seq[Float]
  type PriceMap = Map[Date, Prices]

  def fetchPrices(date : Date) : Prices = ???

  val datesToQuery : Seq[Date] = ???

  class QueryActor() extends Actor {
    def receive = { case date : Date => sender ! fetchPrices(date) }
  }

  implicit val as = ActorSystem()
  implicit val queryTimeout = Timeout(1000)

  import as.dispatcher

  def concurrentQuery(date : Date) : Future[Prices] =    
    ask(as actorOf Props[QueryActor],date).mapTo[Prices]

  val dates2PricesFuture : Future[PriceMap] = 
        Future.sequence(datesToQuery map concurrentQuery)
              .map(datesToQuery zip _)  
              .map(_.toMap)

  dates2PricesFuture onSuccess ... //same as first example

}//end object Foo

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-09-28
    • 1970-01-01
    • 1970-01-01
    • 2019-11-03
    • 2020-07-09
    • 2019-05-10
    • 2014-10-07
    相关资源
    最近更新 更多