【问题标题】:functional parallelism and laziness in ScalaScala 中的函数并行性和惰性
【发布时间】:2017-11-25 16:14:54
【问题描述】:

背景

我一直在看Functional Programming in Scala的书,对Chapter 7: Purely functional parallelism的内容有一些疑问。

这是书中答案的代码:Par.scala,但我对其中的某些部分感到困惑。

这里是Par.scala的第一部分代码,代表Parallelism

import java.util.concurrent._

object Par {
  type Par[A] = ExecutorService => Future[A]

  def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a)

  private case class UnitFuture[A](get: A) extends Future[A] {
    def isDone = true
    def get(timeout: Long, units: TimeUnit): A = get
    def isCancelled = false
    def cancel(evenIfRunning: Boolean): Boolean = false
  }

  def map2[A, B, C](a: Par[A], b: Par[B])(f: (A, B) => C): Par[C] =
    (es: ExecutorService) => {
      val af = a(es)
      val bf = b(es)
      UnitFuture(f(af.get, bf.get))
    }

  def fork[A](a: => Par[A]): Par[A] =
    (es: ExecutorService) => es.submit(new Callable[A] {
      def call: A = a(es).get
    })

  def lazyUnit[A](a: => A): Par[A] =
    fork(unit(a))

  def run[A](es: ExecutorService)(a: Par[A]): Future[A] = a(es)

  def asyncF[A, B](f: A => B): A => Par[B] =
    a => lazyUnit(f(a))

  def map[A, B](pa: Par[A])(f: A => B): Par[B] =
    map2(pa, unit(()))((a, _) => f(a))
}
  • Par[A] 的最简单模型可能是ExecutorService => Future[A],而run 只返回Future
  • unit 通过返回 UnitFuture 将常量值提升为并行计算,这是 Future 的简单实现,它只是包装了一个常量值。
  • map2 将两个并行计算的结果与二进制函数相结合。
  • fork 标记并发评估的计算。在运行强制之前,评估不会真正发生。这是它最简单,最自然的实现。虽然有问题,但还是先搁置一旁吧。
  • lazyUnit 将其未评估的参数包装在 Par 中,并将其标记为并发评估。
  • run 通过实际执行计算从 Par 中提取一个值。
  • asyncF 将任何函数 A => B 转换为异步评估其结果的函数。

问题

fork 是一个让我很困惑的函数,因为它需要一个 惰性参数,稍后将在调用它时对其进行评估。然后我的问题更多是关于我们何时应该使用这个fork,即何时需要惰性评估以及何时需要直接获取值。

这是书中的一个练习:

练习 7.5 Hard:写这个函数,叫sequence。不需要额外的原语。不要调用 run。

def sequence[A](ps: List[Par[A]]): Par[List[A]]

这是答案(提供here)。


第一

  def sequence_simple[A](l: List[Par[A]]): Par[List[A]] =
    l.foldRight[Par[List[A]]](unit(List()))((h, t) => map2(h, t)(_ :: _))

上面的代码和下面的代码有什么不同:

  def sequence_simple[A](l: List[Par[A]]): Par[List[A]] =
    l.foldLeft[Par[List[A]]](unit(List()))((t, h) => map2(h, t)(_ :: _))

另外

  def sequenceRight[A](as: List[Par[A]]): Par[List[A]] =
    as match {
      case Nil => unit(Nil)
      case h :: t => map2(h, fork(sequenceRight(t)))(_ :: _)
    }

  def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = fork {
    if (as.isEmpty) unit(Vector())
    else if (as.length == 1) map(as.head)(a => Vector(a))
    else {
      val (l,r) = as.splitAt(as.length/2)
      map2(sequenceBalanced(l), sequenceBalanced(r))(_ ++ _)
    }
  }

sequenceRight中,直接调用递归函数时使用fork。但是,在sequenceBalanced 中,fork 在整个函数体之外使用。

那么,上面的代码和下面的代码有什么区别(这里我们换了fork的地方):

  def sequenceRight[A](as: List[Par[A]]): Par[List[A]] = fork {
    as match {
      case Nil => unit(Nil)
      case h :: t => map2(h, sequenceRight(t))(_ :: _)
    }
  }

  def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] =
    if (as.isEmpty) unit(Vector())
    else if (as.length == 1) map(as.head)(a => Vector(a))
    else {
      val (l,r) = as.splitAt(as.length/2)
      map2(fork(sequenceBalanced(l)), fork(sequenceBalanced(r)))(_ ++ _)
    }

最后,给定上面定义的sequence,我们有以下函数:

  def parMap[A,B](ps: List[A])(f: A => B): Par[List[B]] = fork {
    val fbs: List[Par[B]] = ps.map(asyncF(f))
    sequence(fbs)
  }

我想知道,我是否也可以通过以下方式实现该功能,即应用开头定义的lazyUnit?这个实现 lazyUnit(ps.map(f)) 是懒惰的吗?

  def parMapByLazyUnit[A, B](ps: List[A])(f: A => B): Par[List[B]] =
    lazyUnit(ps.map(f))

【问题讨论】:

    标签: scala concurrency parallel-processing functional-programming lazy-evaluation


    【解决方案1】:

    我没有完全理解你的疑问。但我发现以下解决方案存在一个主要问题,

    def parMapByLazyUnit[A, B](ps: List[A])(f: A => B): Par[List[B]] =
      lazyUnit(ps.map(f))
    

    要了解问题,请查看def lazyUnit

    def fork[A](a: => Par[A]): Par[A] =
      (es: ExecutorService) => es.submit(new Callable[A] {
        def call: A = a(es).get
      })
    
    def lazyUnit[A](a: => A): Par[A] =
      fork(unit(a))
    

    所以...lazyUnit 接受=> A 类型的表达式并将其提交给ExecutorService 以进行评估。并将此并行计算的包装结果返回为Par[A]

    parMap 中,对于ps: List[A] 的每个元素,我们不仅要使用函数f: A => B 评估相应的映射,而且还必须对in parallel 进行这些评估。

    但是我们的解决方案lazyUnit(ps.map(f)) 会将整个{ ps.map(f) } 评估作为单个任务提交给我们的ExecutionService。这意味着我们没有并行执行。

    我们需要做的是确保对于ps: [A] 中的每个元素a,函数f: A => B 作为我们ExecutorService 的单独任务执行。

    现在,正如我们从实现中了解到的那样,我们可以通过使用 lazyUnit(exp) 来运行 exp: => A 类型的表达式来获得 result: Par[A]

    因此,我们将对ps: List[A] 中的每个a: A 执行此操作,

    val parMappedTmp = ps.map( a => lazyUnit(f(a) ) )
    
    // or
    
    val parMappedTmp = ps.map( a => asyncF(f)(a) )
    
    // or
    
    val parMappedTmp = ps.map(asyncF(f))
    

    但是,现在我们的 parMappedTmpList[Par[B]] 而我们需要 Par[List[B]]

    所以,你需要一个具有以下签名的函数来获得你想要的,

    def sequence[A](ps: List[Par[A]]): Par[List[A]]
    

    一旦你拥有它,

    val parMapped = sequence(parMappedTmp)
    

    【讨论】:

    • 我不认为你必须把 () => f(a) 代替 f(a) 用于懒惰的论点。比如像def f(p: => Int, eval : Boolean) = if (eval) println(p)这样的函数,可以像f(3/0, false)那样调用,而不是f(() => 3/0, false)
    • 我认为这是惰性参数评估的目的。
    • 是的。我认为你是正确的。但其余的解释仍然成立。
    • 您能否修改答案的第一部分?然后让我们进一步讨论其余部分并尝试说清楚。
    • 谢谢。您的回答回答了我的终于问题。您能否同时解决首先另外的问题?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-06-24
    • 2017-04-03
    • 2012-04-27
    • 2020-12-28
    • 2019-12-18
    • 1970-01-01
    • 2016-01-22
    相关资源
    最近更新 更多