【问题标题】:Concurrent map/foreach in scalascala中的并发map/foreach
【发布时间】:2010-12-17 15:22:48
【问题描述】:

我有一个迭代 vals: Iterable[T] 和一个长时间运行的函数,没有任何相关的副作用:f: (T => Unit)。现在它以明显的方式应用于vals

vals.foreach(f)

我希望同时调用f(在合理的范围内)。 Scala 基础库中是否有明显的功能?比如:

Concurrent.foreach(8 /* Number of threads. */)(vals, f)

虽然f 运行时间相当长,但它足够短,我不希望每次调用都调用线程的开销,所以我正在寻找基于线程池的东西。

【问题讨论】:

    标签: scala concurrency functional-programming


    【解决方案1】:

    2009 年的许多答案仍然使用旧的 scala.actors.Futures._,它们不再在新的 Scala 中。虽然 Akka 是首选方式,但更易读的方式是仅使用并行 (.par) 集合:

    vals.foreach { v => f(v) }

    变成

    vals.par.foreach { v => f(v) }

    另外,使用 parMap 可能看起来更简洁,但需要记住导入常用的 Scalaz*。像往常一样,在 Scala 中有不止一种方法可以做同样的事情!

    【讨论】:

      【解决方案2】:

      ScalazparMap。您可以按如下方式使用它:

      import scalaz.Scalaz._
      import scalaz.concurrent.Strategy.Naive
      

      这将为每个函子(包括Iterable)配备parMap 方法,所以你可以这样做:

      vals.parMap(f)
      

      您还会收到parFlatMapparZipWith 等。

      【讨论】:

        【解决方案3】:

        我喜欢Futures 的答案。但是,虽然它会并发执行,但它也会异步返回,这可能不是您想要的。正确的做法如下:

        import scala.actors.Futures._
        
        vals map { x => future { f(x) } } foreach { _() }
        

        【讨论】:

        • 注意vals 是一个严格的集合——如果它是惰性的(在 Scala 2.7 中,这包括 Range 类),直到每个都需要时才会创建期货foreach,不会同时发生任何事情。
        • 我想我们可以通过在map 和当前foreach 之间注入另一个foreach 调用来解决这个问题。因此:vals map { x => future { f(x) } } foreach { x => x } foreach { _() }
        • 那将是我们必须注入的地图,而不是另一个 foreach?我不清楚惰性集合的映射是否严格。最安全的方法可能是调用 toArray.
        • 你是对的,foreach (显然)是错误的注入,因为它返回Unit。我的错! :-) 惰性集合上的map 函数几乎总是非严格的,所以我们可以调用toList(或toArray),或者我们可以投影然后强制:(vals map { x => future { f(x) } } projection).force foreach { _() }。我不知道这是否比简单的toList更好,但肯定是不同的。
        • 你说“异步返回”是什么意思?这是否意味着它是非阻塞的? (为什么会有问题?)
        【解决方案4】:

        我在 Scala 2.8 中使用 scala.actors.Futures 时遇到了一些问题(当我检查时它有问题)。不过,直接使用 java 库对我有用:

        final object Parallel {
          val cpus=java.lang.Runtime.getRuntime().availableProcessors
          import java.util.{Timer,TimerTask}
          def afterDelay(ms: Long)(op: =>Unit) = new Timer().schedule(new TimerTask {override def run = op},ms)
          def repeat(n: Int,f: Int=>Unit) = {
            import java.util.concurrent._
            val e=Executors.newCachedThreadPool //newFixedThreadPool(cpus+1)
            (0 until n).foreach(i=>e.execute(new Runnable {def run = f(i)}))
            e.shutdown
            e.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS)
          }
        }
        

        【讨论】:

          【解决方案5】:

          我会使用scala.actors.Futures:

          vals.foreach(t => scala.actors.Futures.future(f(t)))
          

          【讨论】:

            【解决方案6】:

            Functional Java 的最新版本具有一些您可以使用的高阶并发功能。

            import fjs.F._
            import fj.control.parallel.Strategy._
            import fj.control.parallel.ParModule._
            import java.util.concurrent.Executors._
            
            val pool = newCachedThreadPool
            val par = parModule(executorStrategy[Unit](pool))
            

            然后……

            par.parMap(vals, f)
            

            记得shutdownpool

            【讨论】:

              【解决方案7】:

              您可以使用 Scala 标准库中的 Parallel Collections。 它们就像普通的集合一样,但它们的操作是并行运行的。您只需要在调用某些集合操作之前调用par

              import scala.collection._
              
              val array = new Array[String](10000)
              for (i <- (0 until 10000).par) array(i) = i.toString
              

              【讨论】:

                猜你喜欢
                • 2012-01-26
                • 2017-10-17
                • 2018-04-10
                • 2013-06-09
                • 1970-01-01
                • 2014-11-03
                • 1970-01-01
                • 1970-01-01
                • 2018-01-04
                相关资源
                最近更新 更多