【发布时间】:2017-11-25 16:14:54
【问题描述】:
背景
我一直在看Functional Programming in Scala的书,对Chapter 7: Purely functional parallelism的内容有一些疑问。
这是书中答案的代码:Par.scala,但我对其中的某些部分感到困惑。
这里是Par.scala的第一部分代码,代表Parallelism:
import java.util.concurrent._
object Par {
type Par[A] = ExecutorService => Future[A]
def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a)
private case class UnitFuture[A](get: A) extends Future[A] {
def isDone = true
def get(timeout: Long, units: TimeUnit): A = get
def isCancelled = false
def cancel(evenIfRunning: Boolean): Boolean = false
}
def map2[A, B, C](a: Par[A], b: Par[B])(f: (A, B) => C): Par[C] =
(es: ExecutorService) => {
val af = a(es)
val bf = b(es)
UnitFuture(f(af.get, bf.get))
}
def fork[A](a: => Par[A]): Par[A] =
(es: ExecutorService) => es.submit(new Callable[A] {
def call: A = a(es).get
})
def lazyUnit[A](a: => A): Par[A] =
fork(unit(a))
def run[A](es: ExecutorService)(a: Par[A]): Future[A] = a(es)
def asyncF[A, B](f: A => B): A => Par[B] =
a => lazyUnit(f(a))
def map[A, B](pa: Par[A])(f: A => B): Par[B] =
map2(pa, unit(()))((a, _) => f(a))
}
-
Par[A]的最简单模型可能是ExecutorService => Future[A],而run只返回Future。 -
unit通过返回UnitFuture将常量值提升为并行计算,这是Future的简单实现,它只是包装了一个常量值。 -
map2将两个并行计算的结果与二进制函数相结合。 -
fork标记并发评估的计算。在运行强制之前,评估不会真正发生。这是它最简单,最自然的实现。虽然有问题,但还是先搁置一旁吧。 -
lazyUnit将其未评估的参数包装在Par中,并将其标记为并发评估。 -
run通过实际执行计算从Par中提取一个值。 -
asyncF将任何函数A => B转换为异步评估其结果的函数。
问题
fork 是一个让我很困惑的函数,因为它需要一个 惰性参数,稍后将在调用它时对其进行评估。然后我的问题更多是关于我们何时应该使用这个fork,即何时需要惰性评估以及何时需要直接获取值。
这是书中的一个练习:
练习 7.5 Hard:写这个函数,叫sequence。不需要额外的原语。不要调用 run。
def sequence[A](ps: List[Par[A]]): Par[List[A]]
这是答案(提供here)。
第一
def sequence_simple[A](l: List[Par[A]]): Par[List[A]] =
l.foldRight[Par[List[A]]](unit(List()))((h, t) => map2(h, t)(_ :: _))
上面的代码和下面的代码有什么不同:
def sequence_simple[A](l: List[Par[A]]): Par[List[A]] =
l.foldLeft[Par[List[A]]](unit(List()))((t, h) => map2(h, t)(_ :: _))
另外
def sequenceRight[A](as: List[Par[A]]): Par[List[A]] =
as match {
case Nil => unit(Nil)
case h :: t => map2(h, fork(sequenceRight(t)))(_ :: _)
}
def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = fork {
if (as.isEmpty) unit(Vector())
else if (as.length == 1) map(as.head)(a => Vector(a))
else {
val (l,r) = as.splitAt(as.length/2)
map2(sequenceBalanced(l), sequenceBalanced(r))(_ ++ _)
}
}
在sequenceRight中,直接调用递归函数时使用fork。但是,在sequenceBalanced 中,fork 在整个函数体之外使用。
那么,上面的代码和下面的代码有什么区别(这里我们换了fork的地方):
def sequenceRight[A](as: List[Par[A]]): Par[List[A]] = fork {
as match {
case Nil => unit(Nil)
case h :: t => map2(h, sequenceRight(t))(_ :: _)
}
}
def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] =
if (as.isEmpty) unit(Vector())
else if (as.length == 1) map(as.head)(a => Vector(a))
else {
val (l,r) = as.splitAt(as.length/2)
map2(fork(sequenceBalanced(l)), fork(sequenceBalanced(r)))(_ ++ _)
}
最后,给定上面定义的sequence,我们有以下函数:
def parMap[A,B](ps: List[A])(f: A => B): Par[List[B]] = fork {
val fbs: List[Par[B]] = ps.map(asyncF(f))
sequence(fbs)
}
我想知道,我是否也可以通过以下方式实现该功能,即应用开头定义的lazyUnit?这个实现 lazyUnit(ps.map(f)) 是懒惰的吗?
def parMapByLazyUnit[A, B](ps: List[A])(f: A => B): Par[List[B]] =
lazyUnit(ps.map(f))
【问题讨论】:
标签: scala concurrency parallel-processing functional-programming lazy-evaluation