【问题标题】:Scala parallel collectionsScala 并行集合
【发布时间】:2018-05-28 07:58:02
【问题描述】:

我非常天真地尝试使用 Scala .par,结果证明比非并行版本慢很多。对此有何解释?

注意:问题不是让它更快,而是要理解为什么这种对.par 的幼稚使用不会立即加快速度。

注意 2:计时方法:我用 N = 10000 运行了这两种方法。第一个在大约 20 秒内返回。 3分钟后我杀死了第二个。差远了。如果让它运行更长时间,我会遇到 Java 堆空间异常。

def pi_random(N: Long): Double = {
  val count = (0L until N * N)
    .map { _ =>
      val (x, y) = (rng.nextDouble(), rng.nextDouble())
      if (x*x + y*y <= 1) 1 else 0
    }
    .sum
  4 * count.toDouble / (N * N)
}

def pi_random_parallel(N: Long): Double = {
  val count = (0L until N * N)
    .par
    .map { _ =>
      val (x, y) = (rng.nextDouble(), rng.nextDouble())
      if (x*x + y*y <= 1) 1 else 0
    }
    .sum
  4 * count.toDouble / (N * N)
}

【问题讨论】:

  • 旁注:将变量名称更改为n。首字母大写被理解为 Scala 中的类或类型名称。

标签: scala concurrency scala-collections


【解决方案1】:

如果不做一些实际的分析很难确定,但我有两个理论:

首先,您可能会失去Range 类的一些好处,特别是接近零的内存使用量。当您执行(0L until N * N) 时,您创建了一个Range 对象,该对象是惰性的。它实际上并没有创建任何包含该范围内每个数字的对象。 map 也没有,我想。而sum 一次计算并添加一个数字,因此也几乎没有分配任何内存。

我不确定ParRange 是否同样如此。似乎它必须为每个拆分分配一些数量,并且在调用 map 之后,也许它可能必须将一些中间结果存储在内存中,因为“相邻”拆分等待另一个拆分完成。尤其是堆空间异常让我觉得是这样的。所以你会在 GC 等上浪费很多时间。

其次,对rng.nextDouble 的调用可能是迄今为止该内部函数中最昂贵的部分。但我相信 java 和 scala Random 类本质上都是单线程的。它们在内部同步和阻塞。因此,无论如何您不会从并行性中获得太多收益,实际上还会因开销而损失一些。

【讨论】:

  • 来自文档:“java.util.Random 的实例是线程安全的。但是,跨线程并发使用相同的 java.util.Random 实例可能会遇到争用,从而导致性能不佳。考虑改为使用多线程设计中的 ThreadLocalRandom。”
  • @JoeK 如果在最后一段中您的意思是Random 是瓶颈并且并行化无济于事,那么情况并非如此。用更粗的粒度将其并行化很容易实现一个数量级的加速。一般来说,涉及独立随机变量抽样的实验可以很好地并行化。
【解决方案2】:

每个任务的工作量不够,任务粒度太细。

创建每个任务都需要一些开销:

  • 必须创建一些代表任务的对象
  • 必须确保一次只有一个线程执行一项任务
  • 在某些线程空闲的情况下,必须调用一些作业窃取过程。

对于 N = 10000,您实例化了 100,000,000 个小任务。这些任务中的每一个几乎什么都不做:它生成两个随机数并执行一些基本的算术和一个 if 分支。创建任务的开销无法与每个任务所做的工作相提并论。

任务必须大得多,以便每个线程都有足够的工作要做。此外,如果您将每个 RNG 线程设为本地可能会更快,以便线程可以并行执行其工作,而无需永久锁定默认随机数生成器。

这是一个例子:

import scala.util.Random

def pi_random(N: Long): Double = {
  val rng = new Random
  val count = (0L until N * N)
    .map { _ =>
      val (x, y) = (rng.nextDouble(), rng.nextDouble())
      if (x*x + y*y <= 1) 1 else 0
    }
    .sum
  4 * count.toDouble / (N * N)
}

def pi_random_parallel(N: Long): Double = {
  val rng = new Random
  val count = (0L until N * N)
    .par
    .map { _ =>
      val (x, y) = (rng.nextDouble(), rng.nextDouble())
      if (x*x + y*y <= 1) 1 else 0
    }
    .sum
  4 * count.toDouble / (N * N)
}


def pi_random_properly(n: Long): Double = {
  val count = (0L until n).par.map { _ =>
    val rng = ThreadLocalRandom.current
    var sum = 0
    var idx = 0
    while (idx < n) {
      val (x, y) = (rng.nextDouble(), rng.nextDouble())
      if (x*x + y*y <= 1.0) sum += 1
      idx += 1
    }
    sum
  }.sum
  4 * count.toDouble / (n * n)
}

这是一个小演示和时间安排:

def measureTime[U](repeats: Long)(block: => U): Double = {
  val start = System.currentTimeMillis

  var iteration = 0
  while (iteration < repeats) {
    iteration += 1
    block
  }

  val end = System.currentTimeMillis
  (end - start).toDouble / repeats
}

// basic sanity check that all algos return roughly same result
println(pi_random(2000))
println(pi_random_parallel(2000))
println(pi_random_properly(2000))

// time comparison (N = 2k, 10 repetitions for each algorithm)
val N = 2000
val Reps = 10
println("Sequential:  " + measureTime(Reps)(pi_random(N)))
println("Naive:       " + measureTime(Reps)(pi_random_parallel(N)))
println("My proposal: " + measureTime(Reps)(pi_random_properly(N)))

输出:

3.141333
3.143418
3.14142
Sequential: 621.7
Naive:      3032.6
My version: 44.7

现在并行版本比顺序版本快大约一个数量级(结果显然取决于内核数量等)。

我无法使用 N = 10000 对其进行测试,因为天真的并行化版本会以“超出 GC 开销”的错误使所有内容崩溃,这也说明创建微小任务的开销太大。

在我的实现中,我还展开了内部while:您只需要一个寄存器中的一个计数器,无需通过mapping 在范围上创建一个巨大的集合。


编辑:将所有内容替换为ThreadLocalRandom,现在您的编译器版本是否支持SAM 无关紧要,因此它也应该适用于2.11 的早期版本。

【讨论】:

  • 我无法运行您的代码。它给出:错误:(41、53)类型不匹配;发现:() => scala.util.Random 要求:java.util.function.Supplier[_ <: scala.util.random val rngs="ThreadLocal.withInitial[Random]"> new Random }`跨度>
  • 我使用的是 2.11.8。
  • @Frank 我添加了一个带有显式new java.util.function.Supplier 的版本,应该也适用于 2.11。
  • -2 现在,这变得有趣了吗?如果有人发现了错误:仍然欢迎建设性的批评。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2012-10-03
  • 1970-01-01
  • 2011-08-02
  • 2017-04-03
  • 2011-12-20
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多