【问题标题】:Spark RDD aggregate /fold operation business scenario [duplicate]Spark RDD聚合/折叠操作业务场景[重复]
【发布时间】:2018-05-24 04:13:01
【问题描述】:

[Edit] 实际上,我的问题是关于 Spark RDD 聚合操作的业务场景/要求,尤其是对于 zeroValue 和 RDD 分区,而不是关于它在 Spark 中的工作方式。很抱歉造成混乱。

我正在学习各种 Spark RDD 计算。在研究 Spark RDD 聚合/折叠相关时,我无法考虑聚合/折叠的业务场景。

例如,我将通过折叠计算 RDD 中的值总和。

val myrdd1 = sc.parallelize(1 to 10, 2)
myrdd1.fold(1)((x,y) => x + y)

它返回 58。

如果我们将分区号从 2 更改为 4,它会返回 60。但我希望是 55。

我明白如果在制作 myrdd1 时没有给出分区号,spark 会做什么。它将采用未知的默认分区号。返回值将是“不稳定的”。

所以我不知道为什么Spark有这种逻辑。有没有真实的业务场景有这种需求?

【问题讨论】:

    标签: scala apache-spark aggregate rdd


    【解决方案1】:

    fold 聚合每个分区的数据,从第一个括号中的zero value 开始。分区聚合结果最后与零值组合。

    因此,对于 2 个分区,您正确收到 58: (1+1+2+3+4+5)+(1+6+7+8+9+10)+1

    同样,对于 4 个分区,正确的结果是 60: (1+1+2+3)+(1+4+5+6)+(1+7+8)+(1+9+10)+1

    对于现实世界的场景,这种计算(类似分而治之)可能在您有可交换逻辑的任何地方都很有用,即当运算执行的顺序无关紧要时,例如在数学加法中。这样,Spark 只会在网络中移动聚合的部分结果,而不是,例如,打乱整个块。

    如果您使用 treeReduce 而不是 fold,您对“收到 55”的期望:

    "treeReduce" should "compute sum of numbers" in {
      val numbersRdd = sparkContext.parallelize(1 to 20, 10)
      val sumComputation = (v1: Int, v2: Int) => v1 + v2
    
      val treeSum = numbersRdd.treeReduce(sumComputation, 2)
    
      treeSum shouldEqual(210)
      val reducedSum = numbersRdd.reduce(sumComputation)
      reducedSum shouldEqual(treeSum)
    

    }

    前段时间写了一篇关于RDD中树聚合的小贴:http://www.waitingforcode.com/apache-spark/tree-aggregations-spark/read

    【讨论】:

    • 感谢您的回复。我不清楚你的真实世界场景。似乎您的示例是说聚合仅用于火花内部执行(随机播放)计算。我还有一个例子,我在一个 RDD 中有 10 个数字,RDD 有两个分区。我将获得 10 个数字中的最大值和第二个最大值。数字是 1,2,3,4,5,6,7,8,9,10。如果 1、2、3、4、7 在一个分区上,而 5、6、8、9、10 在另一个分区上,则计算返回可能是 7 和 10。但正确答案必须是 9 和 10。
    • 对于您的示例,最大数字为 2,如果不是返回 1 个数字/分区,而是返回 2 个数字,您将得到正确的结果。如果您可以从部分结果中计算出最终结果,则可以使用 fold,即您在每个分区上都有一个数据子集,然后生成部分结果,最后将它们组合到最终结果中。
    • 是的,你是对的。我可以更改聚合的第二个参数以获得正确答案。我只是很困惑这些计算的真正含义是什么,尤其是分区数会影响计算返回。
    • 分区是 Spark 中的并行化单元。你有更多的分区,[理论上]你会更快地计算你的结果。 IMO 折叠和聚合是互补的。不同之处在于 fold 必须返回与输入完全相同的类型,而 reduce 可以转换类型并在这个新类型上累积。
    【解决方案2】:

    我认为您现在得到的结果符合预期,我将尝试解释它是如何工作的。

    You have an `rdd` with 10 elements in two partition  
    val myrdd1 = sc.parallelize(1 to 10, 2)
    

    假设两个分区包含p1 = {1,2,3,4,5}p2 = {6,7,8,9,10}

    现在根据文档,折叠在每个分区中运行 现在你得到(default value or zero value which is one in your case) +1+2+3+4+5 = 16(1 as zero value)+7+8+9+10 = 41

    最后折叠那些(1 as zero value) 16 + 41 = 58

    同样,如果您有4 分区fold 在默认值为1 的四个分区中运行,并将四个结果与另一个fold1 的默认值相结合,结果为60

    聚合每个分区的元素,然后对所有的结果进行聚合 分区,使用给定的关联函数和中性的“零 value”。函数 op(t1, t2) 允许修改 t1 并返回它 作为其结果值以避免对象分配;但是,它不应该 修改t2。

    这与实现的折叠操作有些不同 Scala 等函数式语言中的非分布式集合。这 折叠操作可以单独应用于分区,然后 将这些结果折叠到最终结果中,而不是应用折叠 以某种定义的顺序依次传递给每个元素。对于函数 不是可交换的,结果可能与折叠的结果不同 应用于非分布式集合。

    对于总和零值应该是0,这会给你正确的结果55

    希望这会有所帮助!

    【讨论】:

    • 感谢您如此详细的回复,我明白了。我只是不明白为什么 Spark 会创建这样的计算。据我了解,分区是 Spark 的 RDD,它应该属于 Spark env。但是现在由于环境的原因我们得到了不同的回报。
    猜你喜欢
    • 1970-01-01
    • 2020-01-02
    • 2015-10-07
    • 2021-04-07
    • 2019-02-11
    • 1970-01-01
    • 2017-07-11
    • 1970-01-01
    相关资源
    最近更新 更多