【问题标题】:Spark: getting cumulative frequency from frequency valuesSpark:从频率值中获取累积频率
【发布时间】:2015-03-14 09:13:28
【问题描述】:

我的问题在单节点环境中回答起来相当简单,但我不知道如何在分布式 Spark 环境中做同样的事情。我现在拥有的是一个“频率图”,其中对于每个项目我都有它出现的次数。例如,它可能是这样的:(1, 2), (2, 3), (3,1),表示 1 发生了 2 次、2 3 次等等。

我想得到的是每个项目的累积频率,所以我需要从上面的实例数据中得到的结果是:(1, 2), (2, 3+2=5), (3, 1+3+2=6)

到目前为止,我已经尝试通过使用mapPartitions 来做到这一点,如果只有一个分区,它会给出正确的结果......否则显然不会。

我该怎么做?

谢谢。 马可

【问题讨论】:

  • 嗨,每个元组中的第一个位置是“唯一 id”吗?我的意思是,可以在 rdd 中再次找到 (1,2) 和其他位置:(1,) ?
  • 它可以被认为是一个唯一的id,因为RDD是在这一步之前聚合在那个值上的......

标签: apache-spark cumulative-frequency


【解决方案1】:

我不认为你想要的在 Spark 中作为分布式转换是可能的,除非你的数据小到可以聚合到单个分区中。 Spark 函数通过将作业分配给远程进程来工作,唯一的通信方式是使用返回一些值的操作或使用累加器。不幸的是,分布式作业无法读取累加器,它们是只写的。

如果您的数据足够小以适合单个分区/进程的内存,您可以合并 (1),然后您现有的代码将工作。如果不是,但单个分区可以放入内存,那么您可以使用本地迭代器:

var total = 0L
rdd.sortBy(_._1).toLocalIterator.foreach(tuple => {
    total = total + tuple._2;
    println((tuple._1, total)) // or write to local file
})

【讨论】:

  • 你写的正是我的想法,但我希望比我聪明的人能找到合适的解决方案......
  • 我认为这与框架不匹配。 Spark用于大型数据集的并行计算,您需要单线程计算,因此您的困难。祝你好运。
【解决方案2】:

如果我正确理解了您的问题,它看起来确实适合其中一个组合器功能 - 看看不同版本的 aggregateByKeyreduceByKey 功能,两者位于here

【讨论】:

  • 我认为你没有理解这个问题,或者你不知道这些函数是如何工作的:它们聚合或减少具有相同键的值,而我需要聚合不同键的值......
  • 我不知何故错过了关键关键字(“累积”)并匆忙得出结论。对此感到抱歉。
猜你喜欢
  • 2012-10-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-01-14
  • 2012-06-24
  • 1970-01-01
  • 1970-01-01
  • 2011-06-08
相关资源
最近更新 更多