【问题标题】:Optimizing list processing in Scala在 Scala 中优化列表处理
【发布时间】:2014-01-23 09:09:29
【问题描述】:

目前正在处理来自 Mixpanel API 的大量 Json 数据。使用一个小数据集,这很容易,下面的代码运行得很好。但是,处理大型数据集需要相当长的时间,因此我们开始看到超时。

我的 Scala 优化技能相当差,所以我希望有人能展示一种更快的方法来处理大型数据集的以下内容。请务必解释为什么,因为这将有助于我自己对 Scala 的理解。

val people = parse[mp.data.Segmentation](o)
val list = people.data.values.map(b => 
  b._2.map(p => 
    Map(
      "id" -> p._1, 
      "activity" -> p._2.foldLeft(0)(_+_._2)
    )
  )
)
.flatten
.filter{ behavior => behavior("activity") != 0 }
.groupBy(o => o("id"))
.map{ case (k,v) => Map("id" -> k, "activity" -> v.map( o => o("activity").asInstanceOf[Int]).sum) }

还有Segmentation类:

case class Segmentation(
  val legend_size: Int,
  val data: Data
)

case class Data(
  val series: List[String],
  val values: Map[String, Map[String, Map[String, Int]]]
)

感谢您的帮助!

编辑:按要求提供样本数据

{"legend_size": 4, "data": {"series": ["2013-12-17", "2013-12-18", "2013-12-19", "2013-12-20", "2013-12-21", "2013-12-22", "2013-12-23", "2013-12-24", "2013-12-25", "2013-12-26", "2013-12-27", "2013-12-28", "2013-12-29", "2013-12-30", "2013-12-31", "2014-01-01", "2014-01-02", "2014-01-03", "2014-01-04", "2014-01-05", "2014-01-06"], "values": {"afef4ac12a21d5c4ef679c6507fe65cd": {"id:twitter.com:194436690": {"2013-12-20": 0, "2013-12-29": 0, "2013-12-28": 0, "2013-12-23": 0, "2013-12-22": 0, "2013-12-21": 1, "2013-12-25": 0, "2013-12-27": 0, "2013-12-26": 0, "2013-12-24": 0, "2013-12-31": 0, "2014-01-06": 0, "2014-01-04": 0, "2014-01-05": 0, "2014-01-02": 0, "2014-01-03": 0, "2014-01-01": 0, "2013-12-30": 0, "2013-12-17": 0, "2013-12-18": 0, "2013-12-19": 0}, "id:twitter.com:330103796": {"2013-12-20": 0, "2013-12-29": 0, "2013-12-28": 0, "2013-12-23": 0, "2013-12-22": 0, "2013-12-21": 0, "2013-12-25": 0, "2013-12-27": 0, "2013-12-26": 1, "2013-12-24": 0, "2013-12-31": 0, "2014-01-06": 0, "2014-01-04": 0, "2014-01-05": 0, "2014-01-02": 0, "2014-01-03": 0, "2014-01-01": 0, "2013-12-30": 0, "2013-12-17": 0, "2013-12-18": 0, "2013-12-19": 0}, "id:twitter.com:216664121": {"2013-12-20": 0, "2013-12-29": 0, "2013-12-28": 0, "2013-12-23": 1, "2013-12-22": 0, "2013-12-21": 0, "2013-12-25": 0, "2013-12-27": 0, "2013-12-26": 0, "2013-12-24": 0, "2013-12-31": 0, "2014-01-06": 0, "2014-01-04": 0, "2014-01-05": 0, "2014-01-02": 0, "2014-01-03": 0, "2014-01-01": 0, "2013-12-30": 0, "2013-12-17": 0, "2013-12-18": 0, "2013-12-19": 0}, "id:twitter.com:414117608": {"2013-12-20": 0, "2013-12-29": 0, "2013-12-28": 1, "2013-12-23": 0, "2013-12-22": 0, "2013-12-21": 0, "2013-12-25": 0, "2013-12-27": 0, "2013-12-26": 0, "2013-12-24": 0, "2013-12-31": 0, "2014-01-06": 0, "2014-01-04": 0, "2014-01-05": 0, "2014-01-02": 0, "2014-01-03": 0, "2014-01-01": 0, "2013-12-30": 0, "2013-12-17": 0, "2013-12-18": 0, "2013-12-19": 0}}}}}

为了回答 Millhouse 的问题,其目的是总结每个日期,以提供一个数字来描述每个 ID 的“活动”总量。 “ID”的格式为id:twitter.com:923842

【问题讨论】:

  • 您能否提供一些测试 JSON 数据的 sn-p,并解释其意图是什么(看起来它是对每个 idactivity 值求和?)跨度>
  • 在大数据的情况下,可用堆空间不足会导致显着减速。使用标准的 Java 内存分析工具来找出答案。
  • @millhouse 示例数据已按要求添加

标签: performance scala optimization scala-collections


【解决方案1】:

我不知道您处理的全部范围、您正在进行的管道、您的服务器承受的压力或您设置了哪种线程配置文件来接收信息。但是,假设您已将 I/O 与 CPU 绑定任务正确分离,并且您向我们展示的内容是严格的 CPU 绑定,请尝试简单地将 .par 添加到第一个 Map。

people.data.values.par.map(b =>

作为第一次通过,看看您是否可以获得一些性能提升。我没有看到处理所需的任何特定顺序,这告诉我并行化已经成熟。

编辑

在玩过并行化之​​后,我要补充一点,修改 TaskSupport 对这种情况很有帮助。您可以像这样修改并行化集合的 tasksupport

import scala.collection.parallel._
val pc = mutable.ParArray(1, 2, 3)
pc.tasksupport = new ForkJoinTaskSupport(
  new scala.concurrent.forkjoin.ForkJoinPool(2))

http://www.scala-lang.org/api/2.10.3/index.html#scala.collection.parallel.TaskSupport

【讨论】:

  • 有趣,不知道par。我会注意到,您需要调用 .toList 之类的东西才能将其转换回普通集合。现在测试...
  • 编辑答案以协助解决。谢谢,这很有帮助。
【解决方案2】:

我有一些建议可能会有所帮助。

  1. 我会尝试在程序中尽早移动过滤器命令 可能的。由于您的数据包含许多活动为 0 的日期,因此您 会看到这样做的改进。最好的解决方案可能是 在解析 json 数据时对此进行测试。如果这是不可能的 让它成为第一个声明。

  2. 按照我的理解,您最终希望找到一种方法来查找 给定 id 的总和。我建议你用 id 的地图来表示这个 到聚合。 scala List 类也有一个 sum 函数。 我想出了这段代码:

    val originalList_IdToAggregate = people.data.values.map(p=> (p._2._1, p._2._2.sum) );

    它可能与您的项目不直接匹配,但我认为它几乎是您所需要的。 如果您需要为此制作地图,只需将 toMap 附加到末尾即可。

  3. 如果这不能为您提供足够的速度,您可以创建自己的解析器来聚合 并在仅解析这种 json 时进行过滤。 如果您使用解析器组合器,在 scala 中编写解析器非常容易。 只要记住尽早扔掉你不需要的东西而不是制造 太多深分支,这应该是一个内存占用少的快速解决方案。

  4. 至于并行,这可能是个好主意。我还不够了解 你的应用程序告诉你什么是最好的方法,但它可能是可能的 将处理数据的计算成本隐藏在 传输数据。尝试在多个上平衡解析和 io 线程,看看你是否能做到这一点。

【讨论】:

    猜你喜欢
    • 2014-12-30
    • 2012-10-27
    • 2013-03-25
    • 2013-06-21
    • 2016-01-23
    • 2018-07-29
    • 2018-07-25
    • 2018-07-26
    相关资源
    最近更新 更多