【发布时间】:2017-12-15 17:04:48
【问题描述】:
scala 并行集合中是否有任何等效于 LINQ 的withDegreeOfParallelism 的设置将运行查询的线程数?我想并行运行一个需要运行一定数量的线程的操作。
【问题讨论】:
scala 并行集合中是否有任何等效于 LINQ 的withDegreeOfParallelism 的设置将运行查询的线程数?我想并行运行一个需要运行一定数量的线程的操作。
【问题讨论】:
使用最新的主干,使用 JVM 1.6 或更高版本,使用:
collection.parallel.ForkJoinTasks.defaultForkJoinPool.setParallelism(parlevel: Int)
不过,这可能会在未来发生变化。计划在下一个版本中采用更统一的方法来配置所有 Scala 任务并行 API。
但是请注意,虽然这将确定查询使用的处理器数量,但这可能不是运行查询所涉及的实际线程数。由于并行集合支持嵌套并行,实际线程池实现可能会在检测到有必要时分配更多线程来运行查询。
编辑:
从 Scala 2.10 开始,设置并行级别的首选方法是将 tasksupport 字段设置为新的 TaskSupport 对象,如下例所示:
scala> import scala.collection.parallel._
import scala.collection.parallel._
scala> val pc = mutable.ParArray(1, 2, 3)
pc: scala.collection.parallel.mutable.ParArray[Int] = ParArray(1, 2, 3)
scala> pc.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))
pc.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ForkJoinTaskSupport@4a5d484a
scala> pc map { _ + 1 }
res0: scala.collection.parallel.mutable.ParArray[Int] = ParArray(2, 3, 4)
在使用分叉连接池实例化ForkJoinTaskSupport 对象时,必须将分叉连接池的并行度设置为所需的值(示例中为2)。
【讨论】:
独立于 JVM 版本,使用 Scala 2.9+(引入并行集合),您还可以使用 grouped(Int) 和 par 函数的组合在小块上执行并行作业,如下所示:
scala> val c = 1 to 5
c: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5)
scala> c.grouped(2).seq.flatMap(_.par.map(_ * 2)).toList
res11: List[Int] = List(2, 4, 6, 8, 10)
grouped(2) 创建长度为 2 或更少的块,seq 确保块的集合不是并行的(在本例中无用),然后在小的并行块上执行 _ * 2 函数(使用 @ 创建987654327@),从而保证最多2个线程并行执行。
不过,这可能比设置工作池参数的效率略低,我不确定。
【讨论】: