【发布时间】:2019-01-27 17:28:37
【问题描述】:
我在学习 Paul Chiusano 和 Runar Bjanarson 所著的“Scala 中的函数式编程”一书(第 7 章 - 纯函数式并行)时遇到了以下场景。
package fpinscala.parallelism
import java.util.concurrent._
import language.implicitConversions
object Par {
type Par[A] = ExecutorService => Future[A]
def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)
def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a) // `unit` is represented as a function that returns a `UnitFuture`, which is a simple implementation of `Future` that just wraps a constant value. It doesn't use the `ExecutorService` at all. It's always done and can't be cancelled. Its `get` method simply returns the value that we gave it.
private case class UnitFuture[A](get: A) extends Future[A] {
def isDone = true
def get(timeout: Long, units: TimeUnit) = get
def isCancelled = false
def cancel(evenIfRunning: Boolean): Boolean = false
}
def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] = // `map2` doesn't evaluate the call to `f` in a separate logical thread, in accord with our design choice of having `fork` be the sole function in the API for controlling parallelism. We can always do `fork(map2(a,b)(f))` if we want the evaluation of `f` to occur in a separate thread.
(es: ExecutorService) => {
val af = a(es)
val bf = b(es)
UnitFuture(f(af.get, bf.get)) // This implementation of `map2` does _not_ respect timeouts. It simply passes the `ExecutorService` on to both `Par` values, waits for the results of the Futures `af` and `bf`, applies `f` to them, and wraps them in a `UnitFuture`. In order to respect timeouts, we'd need a new `Future` implementation that records the amount of time spent evaluating `af`, then subtracts that time from the available time allocated for evaluating `bf`.
}
def fork[A](a: => Par[A]): Par[A] = // This is the simplest and most natural implementation of `fork`, but there are some problems with it--for one, the outer `Callable` will block waiting for the "inner" task to complete. Since this blocking occupies a thread in our thread pool, or whatever resource backs the `ExecutorService`, this implies that we're losing out on some potential parallelism. Essentially, we're using two threads when one should suffice. This is a symptom of a more serious problem with the implementation, and we will discuss this later in the chapter.
es => es.submit(new Callable[A] {
def call = a(es).get
})
def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
def equal[A](e: ExecutorService)(p: Par[A], p2: Par[A]): Boolean =
p(e).get == p2(e).get
}
您可以在 Github here 上找到原始代码。有关 java.util.concurrent 文档,请参阅 here。
我关心fork 的实现。尤其是据称fork在ThreadPool太小时会导致死锁。
我考虑以下示例:
val a = Par.lazyUnit(42 + 1)
val es: ExecutorService = Executors.newFixedThreadPool(2)
println(Par.fork(a)(es).get)
我不希望这个例子最终陷入死锁,因为有两个线程。然而,当我在 Scala REPL 中运行它时,它会在我的计算机上运行。为什么会这样?
初始化ExecutorService时的输出是
es: java.util.concurrent.ExecutorService =
java.util.concurrent.ThreadPoolE
xecutor@73a86d72[Running, pool size = 0, active threads = 0, queued tasks =
0, completed tasks = 0]
pool size = 0 在这里正确吗?换句话说,这是不了解java.util.concurrent._ 的问题还是不了解Scala 部分的问题?
【问题讨论】:
-
如何运行示例?当我将它作为
main方法运行时,它会打印43。但例如,当我从 IntelliJ Worksheet 运行它时,它什么也不打印。 -
@ygor,这对我来说听起来非常棒,但只有在使用交互式 Scala 控制台运行它时,这个问题才能重现。如果我将它作为独立程序运行但每次在 REPL 中都失败,它每次都对我有用。昨天我花了一些时间调查它,我仍然很困惑。
-
嗨@ygor 和@SergGr,谢谢!我确实在控制台中运行过它,并没有尝试使用 main 方法。出于好奇,当您在
val es: ExecutorService = Executors.newFixedThreadPool(2)中将线程数设置为1 时,您是否在main 方法中陷入死锁? -
我刚刚检查并使用
main方法,它的行为符合预期。 -
@clog14,恕我直言,有趣的不是它像宣传的那样作为独立应用程序工作。事实上,它在 REPL 中不起作用。我仍然很好奇为什么会这样。
标签: java scala parallel-processing java.util.concurrent scala-repl