【发布时间】:2017-09-07 22:23:31
【问题描述】:
谁能解释reducebykey、groupbykey、aggregatebykey和combinebykey之间的区别?我已阅读有关此的文档,但无法理解确切的区别。
有例子的解释会很棒。
【问题讨论】:
标签: apache-spark grouping reducing
谁能解释reducebykey、groupbykey、aggregatebykey和combinebykey之间的区别?我已阅读有关此的文档,但无法理解确切的区别。
有例子的解释会很棒。
【问题讨论】:
标签: apache-spark grouping reducing
groupByKey:
语法:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)))
groupByKey 可能会导致磁盘不足问题,因为数据通过网络发送并收集到减少的工作人员身上。
reduceByKey:
语法:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
数据在每个分区合并,每个分区的一个键只有一个输出通过网络发送。 reduceByKey 需要将所有值组合成另一个具有完全相同类型的值。
aggregateByKey:
同reduceByKey,取一个初始值。
3 个参数作为输入
例子:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
输出: 按键汇总结果 酒吧 -> 3 富 -> 5
combineByKey:
3 个参数作为输入
aggregateByKey,不需要总是传递常量,我们可以传递一个返回新值的函数。例子:
val result = rdd.combineByKey(
(v) => (v,1),
( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))
).map( { case (k,v) => (k,v._1/v._2.toDouble) })
result.collect.foreach(println)
reduceByKey,aggregateByKey,combineByKey 首选 groupByKey
参考: Avoid groupByKey
【讨论】:
if clause 来检查组合器是否只是加法,如果是,使用reduceByKey 逻辑?我在这里缺少什么来理解为什么不能在编译时完成吗?仅通过对组合器进行硬编码来提高效率意味着如果没有对通用组合器进行多次此类检查,则应该进行此类检查,对吧?
groupByKey() 只是根据键对数据集进行分组。当 RDD 尚未分区时,这将导致数据混洗。reduceByKey() 类似于分组+聚合。我们可以说reduceByKey() 等价于 dataset.group(...).reduce(...)。与groupByKey() 不同,它将洗牌更少的数据。aggregateByKey() 在逻辑上与 reduceByKey() 相同,但它允许您返回不同类型的结果。换句话说,它允许您将输入作为类型 x 并将聚合结果作为类型 y。例如 (1,2),(1,4) 作为输入, (1,"six") 作为输出。它还采用 零值,将应用于每个键的开头。注意:一个相似之处是它们都是宽操作。
【讨论】:
rdd.groupByKey、rdd.reduceByKey 和sql.groupBy 之间是否有区别?我有一个大型数据集,想使用性能最高的方法。谢谢
虽然 reducebykey 和 groupbykey 会产生相同的答案,但 reduceByKey 示例在大型数据集上效果更好。那是 因为 Spark 知道它可以将输出与每个节点上的公共键结合起来 在洗牌数据之前进行分区。
另一方面,当调用 groupByKey - 所有的键值对 被洗牌了。这是很多不必要的数据 通过网络传输。
更多详情请查看以下链接
【讨论】:
虽然它们都将获取相同的结果,但两个函数的性能存在显着差异。与groupByKey() 相比,reduceByKey() 更适用于更大的数据集。
在reduceByKey() 中,在对数据进行混洗之前,将同一台机器上具有相同键的对组合(通过使用传递给reduceByKey() 的函数)。然后再次调用该函数以减少每个分区中的所有值以产生一个最终结果。
在groupByKey() 中,所有的键值对都被打乱了。这是通过网络传输的大量不必要的数据。
【讨论】:
ReduceByKey reduceByKey(func, [numTasks])-
数据被组合在一起,因此在每个分区中,每个键至少应该有一个值。 然后 shuffle 发生,并通过网络发送到某个特定的 executor 以执行一些操作,例如 reduce。
GroupByKey - groupByKey([numTasks])
它不会合并键的值,而是直接发生随机播放过程 这里有很多数据被发送到每个分区,与初始数据几乎相同。
每个键的值的合并是在洗牌之后完成的。 这里大量数据存储在最终工作节点上,因此导致内存不足问题。
AggregateByKey - aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
它类似于reduceByKey,但您可以在执行聚合时提供初始值。
reduceByKey的使用
reduceByKey 可以在我们在大型数据集上运行时使用。
reduceByKey当输入输出值类型相同时
超过aggregateByKey
此外,建议不要使用groupByKey,而更喜欢reduceByKey。详情可以参考here。
您也可以参考此question 以更详细地了解reduceByKey 和aggregateByKey。
【讨论】:
那么除了这4个,我们还有
foldByKey 与 reduceByKey 相同,但具有用户定义的零值。
AggregateByKey 将 3 个参数作为输入,并使用 2 个函数进行合并(一个用于在相同分区上合并,另一个用于跨分区合并值。第一个参数是 ZeroValue)
而
ReduceBykey 只接受 1 个参数,这是一个用于合并的函数。
CombineByKey 有 3 个参数,所有 3 个都是函数。与 aggregateBykey 类似,但它可以具有 ZeroValue 函数。
GroupByKey 不带参数并将所有内容分组。此外,它是跨分区数据传输的开销。
【讨论】: