A) 如果您有许多具有相同 id 的项目,则此项目的可读性和性能最高:
scala> list.groupBy(_("id")).mapValues(_.map(_("value").asInstanceOf[Int]).sum)
res14: scala.collection.immutable.Map[Any,Int] = Map(D -> 60, A -> 25, C -> 4, B -> 10)
您也可以使用list.groupBy(_("id")).par...。只有当您有许多具有相同键的元素时,它才会运行得更快,否则它会非常慢。
否则,更改线程的上下文本身会使.par 版本变慢,因为map(_"value").sum(您的嵌套map-reduce)可能比在线程之间切换更快。如果N = 系统中的核心数,那么您的map-reduce 应该慢@987654330@ 倍才能从par 中受益,当然粗略地说。
B) 因此,如果并行化效果不佳(最好通过性能测试进行检查),您可以以专门的方式“重新实现”groupBy:
val m = scala.collection.mutable.Map[String, Int]() withDefaultValue(0)
for (e <- list; k = e("id").toString) m.update(k, m(k) + e("value").asInstanceOf[Int])
C) 最并行化的选项是:
val m = new scala.collection.concurrent.TrieMap[String, Int]()
for (e <- list.par; k = e("id").toString) {
def replace = {
val v = m(k)
m.replace(k, v, v + e("value").asInstanceOf[Int]) //atomic
}
m.putIfAbsent(k, 0) //atomic
while(!replace){} //in case of conflict
}
scala> m
res42: scala.collection.concurrent.TrieMap[String,Int] = TrieMap(B -> 10, C -> 4, D -> 60, A -> 25)
D) 使用 scalaz semigroups,函数式风格中并行度最高的(每次合并 map 时速度较慢,但最适合分布式 map-reduce 没有共享内存):
import scalaz._; import Scalaz._
scala> list.map(x => Map(x("id").asInstanceOf[String] -> x("value").asInstanceOf[Int]))
.par.reduce(_ |+| _)
res3: scala.collection.immutable.Map[String,Int] = Map(C -> 4, D -> 60, A -> 25, B -> 10)
但只有使用比“+”更复杂的聚合时,它才会更高效。
那么我们来做简单的性能测试:
def time[T](n: Int)(f: => T) = {
val start = System.currentTimeMillis()
for(i <- 1 to n) f
(System.currentTimeMillis() - start).toDouble / n
}
这是在 MacBook Pro 2.3 GHz Intel Core i7 上使用 JDK8 在 Scala 2.12 REPL 中完成的。每个测试启动两次 - 首先是预热 JVM。
1) 对于您的输入集合和time(100000){...},从最慢到最快:
`par.groupBy.par.mapValues` = 0.13861 ms
`groupBy.par.mapValues` = 0.07667 ms
`most parallelized` = 0.06184 ms
`scalaz par.reduce(_ |+| _)` = 0.04010 ms //same for other reduce-based implementations, mentioned here
`groupBy.mapValues` = 0.00212 ms
`for` + `update` with mutable map initialization time = 0.00201 ms
`scalaz suml` = 0.00171 ms
`foldLeft` from another answer = 0.00114 ms
`for` + `update` without mutable map initialization time = 0.00105
因此,来自另一个答案的foldLeft 似乎是您输入的最佳解决方案。
2) 让我们把它变大
scala> val newlist = (1 to 1000).map(_ => list).reduce(_ ++ _)
现在输入newList 和time(1000){...}:
`scalaz par.reduce(_ |+| _)` = 1.422 ms
`foldLeft`/`for` = 0.418 ms
`groupBy.par.mapValues` = 0.343 ms
这里最好选择groupBy.par.mapValues。
3) 最后,让我们定义另一个聚合:
scala> implicit class RichInt(i: Int){ def ++ (i2: Int) = { Thread.sleep(1); i + i2}}
defined class RichInt
并使用list 和time(1000) 进行测试:
`foldLeft` = 7.742 ms
`most parallelized` = 3.315 ms
所以这里最好使用大多数并行化的版本。
为什么 reduce 这么慢:
让我们采用 8 个元素。它产生一个从叶子[1] + ... + [1]到根[1 + ... + 1]的计算树:
time(([1] + [1]) + ([1] + [1]) + ([1] + [1]) + ([1] + [1])
=> ([1 +1] + [1 +1]) + ([1 + 1] + [1 + 1])
=> [1 + 1 + 1 + 1] + [1 + 1 + 1 + 1])
= (1 + 1 + 1 + 1) + (2 + 2) + 4 = 12
时间(N = 8) = 8/2 + 2*8/4 + 4*8/8 = 8 * (1/2 + 2/4 + 4/8) = 8 * log2(8)/ 2 = 12
或者只是:
当然,这个公式只适用于实际上是 2 的幂的数字。无论如何,复杂度是 O(NlogN),比 foldLeft 的 O(N) 慢。即使在并行化之后,它也只是O(N),所以这个实现只能用于大数据的分布式 Map-Reduce,或者只是说当你没有足够的内存并将你的 Map 存储在某个缓存中时。
您可能会注意到,对于您的输入,它比其他选项的并行化效果更好——这只是因为对于 6 个元素,它并没有那么慢(这里几乎是 O(1))——而且你只进行了一次 reduce 调用——当其他选项之前对数据进行分组时或者只是创建更多线程,这会导致更多“线程切换”开销。简单地说,reduce 在这里创建的线程更少。但是,如果您有更多数据 - 它当然不起作用(参见实验 2)。