【问题标题】:Synchronizing on function parameter for multithreaded memoization同步函数参数以进行多线程记忆
【发布时间】:2014-08-10 19:04:34
【问题描述】:

我的核心问题是:如何在对象实例和方法参数的组合上实现方法中的同步?

这是我的情况的详细信息。我正在使用以下代码来实现记忆,改编自this answer

/**
 * Memoizes a unary function
 * @param f the function to memoize
 * @tparam T the argument type
 * @tparam R the result type
 */
class Memoized[-T, +R](f: T => R) extends (T => R) {

  import scala.collection.mutable

  private[this] val cache = mutable.Map.empty[T, R]

  def apply(x: T): R = cache.getOrElse(x, {
    val y = f(x)
    cache += ((x, y))
    y
  })
}

在我的项目中,我正在记忆 Futures 以​​对异步 API 调用进行重复数据删除。当使用for...yield 映射使用标准ExcecutionContext 创建的结果期货时,这工作得很好,但是当我升级到Scala Async 以便更好地处理这些期货时。但是,我意识到库使用的多线程允许多个线程进入apply,从而破坏了记忆,因为async 块全部并行执行,在cache 之前输入“orElse”thunk 可以用新的@ 更新987654331@。

为了解决这个问题,我将主要的应用函数放在 this.synchronized 块中:

def apply(x: T): R = this.synchronized {
  cache.getOrElse(x, {
    val y = f(x)
    cache += ((x, y))
    y
  })
}

这恢复了记忆中的行为。缺点是这将阻止具有不同参数的调用,至少在创建 Future 之前。我想知道是否有办法在Memoized 实例和x 参数的值与apply 的组合上设置更细粒度的同步。这样一来,只有会被删除重复数据的调用才会被阻止。

作为旁注,我不确定这是否真的对性能至关重要,因为同步块将在创建并返回 Future 后释放(我认为?)。但如果有任何我没有想到的问题,我也想知道。

【问题讨论】:

  • 你考虑过使用演员吗?然后客户端代码永远不会阻塞响应,并且只有一个线程会触及cache
  • 我没有想到这一点,但根据我所知甚少,这听起来像是一个更惯用的解决方案。您能否解释更多或提供一个链接,说明如何从控制器代码中使用 API 以进行 Web 响应?
  • 好吧,根据我的理解,多线程对建立记忆没有多大帮助:stackoverflow.com/a/20462893/2073130

标签: multithreading scala asynchronous memoization thread-synchronization


【解决方案1】:

Akka Actor 与 future 相结合提供了一种强大的方法来包装可变状态而不会阻塞。下面是一个如何使用 Actor 进行记忆的简单示例:

import akka.actor._
import akka.util.Timeout
import akka.pattern.ask
import scala.concurrent._
import scala.concurrent.duration._

class Memoize(system: ActorSystem) {
  class CacheActor(f: Any => Future[Any]) extends Actor {
    private[this] val cache = scala.collection.mutable.Map.empty[Any, Future[Any]]

    def receive = {
      case x => sender ! cache.getOrElseUpdate(x, f(x))
    }
  }

  def apply[K, V](f: K => Future[V]): K => Future[V] = {
    val fCast = f.asInstanceOf[Any => Future[Any]]
    val actorRef = system.actorOf(Props(new CacheActor(fCast)))
    implicit val timeout = Timeout(5.seconds)
    import system.dispatcher
    x => actorRef.ask(x).asInstanceOf[Future[Future[V]]].flatMap(identity)
  }
}

我们可以像这样使用它:

val system = ActorSystem()
val memoize = new Memoize(system)
val f = memoize { x: Int =>
  println("Computing for " + x)
  scala.concurrent.Future.successful {
    Thread.sleep(1000)
    x + 1
  }
}
import system.dispatcher
f(5).foreach(println)
f(5).foreach(println)

而“Computing for 5”只会打印一次,而“6”会打印两次。

有一些看起来很吓人的 asInstanceOf 调用,但它是完全类型安全的。

【讨论】:

  • 赞成,因为我认为这是一个很棒的工具。我仍在考虑采用它是否有意义,因为它需要承担很多额外的样板。
  • 这也将缓存失败的期货。
猜你喜欢
  • 2015-03-20
  • 1970-01-01
  • 2013-08-20
  • 2013-01-15
  • 1970-01-01
  • 1970-01-01
  • 2012-04-20
  • 1970-01-01
  • 2019-01-21
相关资源
最近更新 更多