【问题标题】:How to sum values and group them by a key value in Scala's List of Map?如何对值求和并按 Scala 的 Map List 中的键值对它们进行分组?
【发布时间】:2015-03-29 20:23:58
【问题描述】:

我有一个地图列表:

val list = List(
  Map("id" -> "A", "value" -> 20, "name" -> "a"),
  Map("id" -> "B", "value" -> 10, "name" -> "b"),
  Map("id" -> "A", "value" -> 5, "name" -> "a"),
  Map("id" -> "C", "value" -> 1, "name" -> "c"),
  Map("id" -> "D", "value" -> 60, "name" -> "d"),
  Map("id" -> "C", "value" -> 3, "name" -> "c")
)

我想以最有效的方式对value 求和并按id 值对它们进行分组,这样它就变成了:

Map(A -> 25, B -> 10, C -> 4, D -> 60)

【问题讨论】:

  • 地图总是只有两个元素吗?当值是两种不同类型时,为什么要使用 Map 呢?从答案中可以看出,这意味着令人不快的演员阵容。这里有点设计的味道......
  • 哪一个? val list 还是预期的结果? val list 中的映射可以有其他键/值。我将更新我的示例。此外,这个问题与现实世界的应用程序设计无关。
  • 谢谢。但我确实认为 String 映射的尴尬 -> Any 值得评论....
  • 我知道。请把它当作一个问题的挑战:)

标签: scala


【解决方案1】:

A) 如果您有许多具有相同 id 的项目,则此项目的可读性和性能最高:

scala> list.groupBy(_("id")).mapValues(_.map(_("value").asInstanceOf[Int]).sum)
res14: scala.collection.immutable.Map[Any,Int] = Map(D -> 60, A -> 25, C -> 4, B -> 10)

您也可以使用list.groupBy(_("id")).par...。只有当您有许多具有相同键的元素时,它才会运行得更快,否则它会非常慢。

否则,更改线程的上下文本身会使.par 版本变慢,因为map(_"value").sum(您的嵌套map-reduce)可能比在线程之间切换更快。如果N = 系统中的核心数,那么您的map-reduce 应该慢@​​987654330@ 倍才能从par 中受益,当然粗略地说。

B) 因此,如果并行化效果不佳(最好通过性能测试进行检查),您可以以专门的方式“重新实现”groupBy

val m = scala.collection.mutable.Map[String, Int]() withDefaultValue(0)
for (e <- list; k = e("id").toString) m.update(k, m(k) + e("value").asInstanceOf[Int])

C) 最并行化的选项是:

val m = new scala.collection.concurrent.TrieMap[String, Int]()
for (e <- list.par; k = e("id").toString) {
    def replace = {           
       val v = m(k)
       m.replace(k, v, v + e("value").asInstanceOf[Int]) //atomic
    }
    m.putIfAbsent(k, 0) //atomic
    while(!replace){} //in case of conflict
}

scala> m
res42: scala.collection.concurrent.TrieMap[String,Int] = TrieMap(B -> 10, C -> 4, D -> 60, A -> 25)

D) 使用 scalaz semigroups,函数式风格中并行度最高的(每次合并 map 时速度较慢,但​​最适合分布式 map-reduce 没有共享内存):

import scalaz._; import Scalaz._
scala> list.map(x => Map(x("id").asInstanceOf[String] -> x("value").asInstanceOf[Int]))
    .par.reduce(_ |+| _)
res3: scala.collection.immutable.Map[String,Int] = Map(C -> 4, D -> 60, A -> 25, B -> 10)

但只有使用比“+”更复杂的聚合时,它才会更高效。


那么我们来做简单的性能测试:

def time[T](n: Int)(f: => T) = {
  val start = System.currentTimeMillis()
  for(i <- 1 to n) f
  (System.currentTimeMillis() - start).toDouble / n
}

这是在 MacBook Pro 2.3 GHz Intel Core i7 上使用 JDK8 在 Scala 2.12 REPL 中完成的。每个测试启动两次 - 首先是预热 JVM。

1) 对于您的输入集合和time(100000){...},从最慢到最快:

`par.groupBy.par.mapValues` = 0.13861 ms
`groupBy.par.mapValues` = 0.07667 ms
`most parallelized` = 0.06184 ms    
`scalaz par.reduce(_ |+| _)` = 0.04010 ms //same for other reduce-based implementations, mentioned here
`groupBy.mapValues` = 0.00212 ms
`for` + `update` with mutable map initialization time = 0.00201 ms
`scalaz suml` = 0.00171 ms      
`foldLeft` from another answer = 0.00114 ms
`for` + `update` without mutable map initialization time = 0.00105

因此,来自另一个答案的foldLeft 似乎是您输入的最佳解决方案。

2) 让我们把它变大

 scala> val newlist = (1 to 1000).map(_ => list).reduce(_ ++ _)

现在输入newListtime(1000){...}

 `scalaz par.reduce(_ |+| _)` = 1.422 ms
 `foldLeft`/`for` = 0.418 ms
 `groupBy.par.mapValues` = 0.343 ms

这里最好选择groupBy.par.mapValues

3) 最后,让我们定义另一个聚合:

scala> implicit class RichInt(i: Int){ def ++ (i2: Int) = { Thread.sleep(1); i + i2}}
defined class RichInt

并使用listtime(1000) 进行测试:

`foldLeft` = 7.742 ms
`most parallelized` = 3.315 ms

所以这里最好使用大多数并行化的版本。


为什么 reduce 这么慢:

让我们采用 8 个元素。它产生一个从叶子[1] + ... + [1]到根[1 + ... + 1]的计算树:

time(([1] + [1]) + ([1] + [1]) + ([1] + [1]) + ([1] + [1]) 
   => ([1 +1] + [1 +1]) + ([1 + 1] + [1 + 1]) 
   => [1 + 1 + 1 + 1] + [1 + 1 + 1 + 1]) 
 = (1 + 1 + 1 + 1) +  (2 + 2) + 4 = 12

时间(N = 8) = 8/2 + 2*8/4 + 4*8/8 = 8 * (1/2 + 2/4 + 4/8) = 8 * log2(8)/ 2 = 12

或者只是:

当然,这个公式只适用于实际上是 2 的幂的数字。无论如何,复杂度是 O(NlogN),比 foldLeftO(N) 慢。即使在并行化之后,它也只是O(N),所以这个实现只能用于大数据的分布式 Map-Reduce,或者只是说当你没有足够的内存并将你的 Map 存储在某个缓存中时。

您可能会注意到,对于您的输入,它比其他选项的并行化效果更好——这只是因为对于 6 个元素,它并没有那么慢(这里几乎是 O(1))——而且你只进行了一次 reduce 调用——当其他选项之前对数据进行分组时或者只是创建更多线程,这会导致更多“线程切换”开销。简单地说,reduce 在这里创建的线程更少。但是,如果您有更多数据 - 它当然不起作用(参见实验 2)。

【讨论】:

  • 我想说“你再说一遍”,但这听起来很陈词滥调:)。无论如何,list.par.groupBy(_("id")).mapValues(_.map(_("value").asInstanceOf[Int]).sum)list.groupBy(_("id")).par.mapValues(_.map(_("value").asInstanceOf[Int]).sum) 之间哪个更好?
  • 很快,这取决于您的收藏 - 因此最好运行性能测试以选择合适的解决方案。
  • 一如既往,您的回答总是给我带来新的东西:)。只是一点建议:不要使用map 作为变量名,例如val map,可能会误用map方法。
  • 好的。关于单字母命名,你会怎么说:) cse.unsw.edu.au/~cs3161/14s2/StyleGuide.html(我知道,Scala 不是 Haskell)
  • 这不是并行收集的源代码 - 只需尝试 List(1,2,3,4,5,6,7,8).par.reduce((a, b) =&gt; {println(a + " " + b); a + b}) (执行几次) - 并比较它们。我已经用 reduce 示例更新了答案(参见 D)。 reduce 正在处理半群,这意味着某些“+”操作的关联性,这意味着顺序在这里无关紧要。 reduceLeft 以与列表中存在的元素相同的顺序执行操作。
【解决方案2】:

也使用foldLeft:

list.foldLeft(Map[String, Int]().withDefaultValue(0))((res, v) => {
  val key = v("id").toString
  res + (key -> (res(key) + v("value").asInstanceOf[Int]))
})

更新: reduceLeft

(Map[String, Any]().withDefaultValue(0) :: list).reduceLeft((res, v) => {
  val key = v("id").toString
  res + (key -> (res(key).asInstanceOf[Int] + v("value").asInstanceOf[Int]))
})

顺便说一句,如果您查看reduceLeft 定义,您会发现它使用相同的foldLeft

  def reduceLeft[B >: A](f: (B, A) => B): B =
    if (isEmpty) throw new UnsupportedOperationException("empty.reduceLeft")
    else tail.foldLeft[B](head)(f)

更新 2: 使用 parreduce: 这里的问题是区分结果 Map 值和初始 Map 值。我选择了contains("id")

list.par.reduce((a, b) => {
  def toResultMap(m: Map[String, Any]) =
    if (m.contains("id"))
      Map(m("id").toString -> m("value")).withDefaultValue(0)
    else m
  val aM = toResultMap(a)
  val bM = toResultMap(b)
  aM.foldLeft(bM)((res, v) =>
    res + (v._1 -> (res(v._1).asInstanceOf[Int] + v._2.asInstanceOf[Int])))
})

【讨论】:

  • 性能与 scalaz 的 |+|list.map(x =&gt; Map(x("id").asInstanceOf[String] -&gt; x("value").asInstanceOf[Int])).par.reduce((map1,map2) =&gt; map1 ++ map2.map{ case (k,v) =&gt; k -&gt; (v + map1.getOrElse(k,0)) }) from this 答案中的相同
  • 比你的foldLeft慢40倍
【解决方案3】:

我不知道“最有效”,但我能想到的最好的方法是使用 scalaz suml,它使用 MonoidMonoid for Map 完全符合您的要求。唯一丑陋的部分是将这些 Map[String, Any]s 转换成更良好的类型并代表我们想要的结构(例如 Map("A" → 20))。

import scalaz._, Scalaz._
list.map{m => 
  Map(m("id").asInstanceOf[String] → m("value").asInstanceOf[Int])
}.suml

【讨论】:

    【解决方案4】:

    Scala 2.13 开始,您可以使用groupMapReduce 方法(顾名思义)等效于groupBy,后跟mapValuesreduce 步骤:

    // val list = List(Map("id" -> "A", "value" -> 20, "name" -> "a"), Map("id" -> "B", "value" -> 10, "name" -> "b"), Map("id" -> "A", "value" -> 5, "name" -> "a"), Map("id" -> "C", "value" -> 1, "name" -> "c"), Map("id" -> "D", "value" -> 60, "name" -> "d"), Map("id" -> "C", "value" -> 3, "name" -> "c"))
    list.groupMapReduce(_("id"))(_("value").asInstanceOf[Int])(_ + _)
    // Map("A" -> 25, "B" -> 10, "C" -> 4, "D" -> 60)
    

    这个:

    • groups 按其“id”字段 (_("id")) 映射(groupMapReduce 的组部分)

    • maps 每个分组映射到他们键入回 Int 的“值”字段 (_("value").asInstanceOf[Int])(映射组的一部分MapReduce)

    • 每个组 (_ + _) 中的reduces 值通过求和(减少 groupMap 的一部分Reduce)。

    这是one-pass version 可以翻译的内容:

    list.groupBy(_("id")).mapValues(_.map(_("value").asInstanceOf[Int]).reduce(_ + _)).toMap
    

    【讨论】:

      猜你喜欢
      • 2018-10-26
      • 2011-05-26
      • 2021-07-25
      • 2019-08-29
      • 1970-01-01
      • 2018-03-17
      • 1970-01-01
      • 2019-09-28
      相关资源
      最近更新 更多