【问题标题】:How can I use Scala concurrent programming to do computations in parallel?如何使用 Scala 并发编程进行并行计算?
【发布时间】:2010-12-13 05:19:13
【问题描述】:

我正在尝试使用 Scala 来查找产生最大返回值的函数的参数,并且我想并行执行此操作。所以对于这个函数:

def f(i: Long): Double = {
  // do something with i and return a double
}  

我想在传递给函数 f 时给出最大值的范围 (0, x) 上找到输入参数 i。这是我目前所拥有的:

import scala.concurrent.ops._

def parMap(f: Long => (Double, Long), xs: List[Int]): Array[(Double, Long)] = {
  val results = new Array[(Double, Long)](xs.length)
  replicate(0, xs.length) { i => results(i) = f(xs(i)) }
  results
}

var results = parMap(i => (f(i), i), List.range(0, i)).max

它可能工作正常,但我收到 java.lang.OutOfMemoryError: Java heap space 错误。对于我正在处理的整个结果集太大而无法放入内存的问题,因此它需要丢弃不如迄今为止最好的结果。如果我使列表范围足够小以使其全部适合内存,我的结果数组(在它调用 max 方法之前)看起来像这样:

Array(null, null, (-Infinity,2), (-Infinity,3), null, (-Infinity,5), (-Infinity,6), (-Infinity,7), (-Infinity,8), (-22184.3237904591,9), null, (-22137.315048628963,11)...

-Infinity 值对于我正在做的事情是正常的,但空值不是。每次运行它都会得到不同的空值,所以这是随机的。这就像复制方法在某些函数调用上“放弃”并给出 null。

注意我使用的是 Scala 2.8.1。

另外,在我看来,关于 Scala 和并行计算的准确文档很难获得。我想了解更多,所以我可以自己解决像这样的问题。谁能推荐一个我可以学习的可靠资源?

【问题讨论】:

  • 欢迎来到 SO。请注意“......它不起作用......”风格的陈述,因为它们没有提供太多的工作(可以编辑问题以包含错误消息/症状等细节 - 它可能只是 2.7 和 2.8 之间的区别)。
  • 看看 Scala 2.9 并行集合 stackoverflow.com/q/3740505/203968,这里有 2.9 scala-lang.org/node/212/distributions
  • Scala 2.9 也有一个 maxBy 方法,所以你可以只写 (0 until i).par.maxBy(f)

标签: scala


【解决方案1】:

我没有完全跟上 2.9 并行集合的速度,我不确定 concurrent.ops 是否维护得很好,但在我看来,您的任务非常适合 2.8 中的未来:

// Setup--you want to use longs, so you can't use range
val x = 4000000000L  // Note that this doesn't fit in a signed integer
def f(l: Long) = l + 8e9/(3+l)
def longRange(a: Long, b: Long) = new Iterator[Long] {
  private[this] var i = a
  def hasNext = i<b
  def next = { val j = i; i += 1; j }
}

val cpus = 4
val ranges = (1 to cpus).map(i => longRange(((i-1)*x)/cpus, (i*x)/cpus))
val maxes = ranges.map(r => scala.actors.Futures.future(r.map(f).max))
println("Total max is " + maxes.map(_()).max)

在这里,您手动拆分工作并要求计算范围内每个部分的最大值,该值由迭代器按需提供。这些是在未来计算的,也就是说,Futures.future 返回一个承诺,它最终将交付返回值。当调用myFuture.apply() 时,承诺实际上是保持不变的,在本例中是println 中的_()。要获得总最大值,您必须取最大值的最大值,而这当然不能返回,直到所有推迟到未来的工作真正完成。

如果您想验证它是否正常工作,您可以尝试比较四线程和单线程版本的运行时。

(请注意,我提供的函数的答案应该是 4.000000001e9。)

另请注意,如果您真的希望事情快速运行,您可能应该编写自己的范围测试:

def maxAppliedRange(a: Long, b: Long, f: Long=>Double) = {
  var m = f(a)
  var i = a
  while (i < b) {
    val x = f(i)
    if (m < x) m = x
    i += 1
  }
  m
}
val maxes = (1 to cpus).map(i => 
  scala.actors.Futures.future( maxAppliedRange((i-1)*x/cpus,i*x/cpus,f) )
)
println("Total max is " + maxes.map(_()).max)

这提供了更好的性能,因为没有装箱/拆箱,因此垃圾收集器不会受到压力,因此并行运行会产生更好的结果。这对我来说比上面的方法快 40 倍,注意并行集合也是如此。所以要小心!仅仅使用更多的内核并不一定能加快计算速度,尤其是在处理大量垃圾的任务时。

【讨论】:

  • 感谢您周到的回复。我尝试了这两个示例,发现它们都有效,但是第二个示例要快得多。不过我有几个问题。当我将 cpus 变量设置为 1 时,我的程序需要 165 秒,但当我将其设置为 2 时需要 98 秒。为什么不需要 82 秒?我期待更多的改进。此外,我的下一步是使用我的代码并使用网格增益和云计算来增强它,使其更加强大。你对我这样做有什么建议吗?
  • @Jim - 存在与设置问题相关的开销,并且在两个线程中较慢的线程完成之前您无法得到答案(如果没有其他原因,一个会更慢,而不是那个 CPU可用于执行更多操作系统任务)。我认为在迁移到云之前了解更多关于并行计算的知识是一个非常好的主意。有很多问题需要思考。如果机器在计算过程中出现故障怎么办?您如何平衡在慢速和快速机器上的工作?您如何收集和分发作品?您如何平衡通信与计算?
  • 我做了一些谷歌搜索,发现有很多关于并行计算的知识。我有很多阅读要做。另外,您对我可以在哪里了解有关同时使用 Scala 的更多信息有什么建议吗?我去过 Scala 网站并浏览了他们的文档,但我想了解更多。
【解决方案2】:

我认为您可以通过使用期货以及使用全局参与者线程池来简洁地做到这一点。与您的原始示例保持一致:

import scala.actors.Futures._

def parMap(f: Long => (Double,Long), xs: List[Int]) : Array[(Double,Long)] = {
  val results = new Array[(Double, Long)](xs.length)
  val futures = (0 until xs.length).map { i =>
    future { results(i) = f(xs(i)) }
  }
  futures.foreach(_())
  results
}

结果:

scala> parMap(l => (l.toDouble,l), List(1,2,3))
res2: Array[(Double, Long)] = Array((1.0,1), (2.0,2), (3.0,3))

这将使要完成的工作并行化。如果您想针对您拥有的处理器数量对其进行优化,您可以使用actors.corePoolSize 和actors.maxPoolSize 属性设置actor 池的大小。

【讨论】:

    猜你喜欢
    • 2020-01-17
    • 2016-09-27
    • 2020-11-22
    • 1970-01-01
    • 1970-01-01
    • 2012-06-23
    • 1970-01-01
    • 2015-08-27
    • 1970-01-01
    相关资源
    最近更新 更多