【问题标题】:Why result of Spark reduceByKey is not consistent为什么 Spark reduceByKey 的结果不一致
【发布时间】:2016-09-06 23:44:08
【问题描述】:

我正在尝试使用 scala 通过 spark 计算每行的迭代次数。
以下是我的输入:

1 维克拉姆
2 萨钦
3 小号
4 好了
5 阿库尔
5 阿库尔
1 维克拉姆
1 维克拉姆
3 小号
10阿舒
5 阿库尔
1 维克拉姆
2 萨钦
7 维克拉姆

现在我创建了 2 个单独的 RDD,如下所示。

val f1 = sc.textFile("hdfs:///path to above data file")
val m1 = f1.map( s => (s.split(" ")(0),1) ) //creating a tuple (key,1)
//now if i create a RDD as
val rd1 = m1.reduceByKey((a,b) => a+b )
rd1.collect().foreach(println)
//I get a proper output i.e (it gives correct output every time)
//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1)

//but if i create a RDD as
val rd2 = m1.reduceByKey((a,b) => a+1 )
rd2.collect().foreach(println)
//I get a inconsistent result i.e some times i get this (WRONG)
//output: (4,1) (2,2) (7,1) (5,2) (3,2) (1,2) (10,1)
//and sometimes I get this as output (CORRECT)
//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1) 

我无法理解为什么会发生这种情况以及在哪里使用什么。我也尝试过将 RDD 创建为

val m2 = f1.map(s => (s,1))
val rd3 = m2.reduceByKey((a,b) => a+1 )
// Then also same issue occurs with a+1 but every thing works fine with a+b

【问题讨论】:

    标签: scala hadoop apache-spark


    【解决方案1】:

    reduceByKey 假定传递的函数是 commutativeassociative(正如 docs 明确指出的那样)。 而且 - 你的第一个函数(a, b) => a + b,但(a, b) => a+1不是

    为什么? 一方面 - reduceByKey 将提供的函数应用于每个分区,然后应用于所有分区的组合结果。换句话说,b 并不总是1,所以使用a+1 是不正确的。

    考虑以下场景 - 输入包含 4 条记录,分为两个分区:

    (aa, 1)
    (aa, 1)
    
    (aa, 1)
    (cc, 1)
    

    reduceByKey(f) 在这个输入上的计算可能如下:

    val intermediate1 = f((aa, 1), (aa, 1)) 
    val intermediate2 = f((aa, 1), (cc, 1))
    
    val result = f(intermediate2, intermediate1)
    

    现在,让我们关注f = (a, b) => a + b

    val intermediate1 = f((aa, 1), (aa, 1))       // (aa, 2)
    val intermediate2 = f((aa, 1), (cc, 1))       // (aa, 1), (cc, 1)
    
    val result = f(intermediate2, intermediate1)  // (aa, 3), (cc, 1)
    

    还有f = (a, b) => a + 1:

    val intermediate1 = f((aa, 1), (bb, 1))       // (aa, 2)
    val intermediate2 = f((aa, 1), (cc, 1))       // (aa, 1), (cc, 1)
    
    // this is where it goes wrong:
    val result = f(intermediate2, intermediate1)  // (aa, 2), (cc, 1)
    

    主要是 - 中间计算的顺序无法保证,并且可能在执行之间发生变化,对于后一种非交换函数的情况,这意味着结果有时是错误的。

    【讨论】:

    • 非常感谢 Tzach 的澄清,但是这应该每次都成立吧?为什么我在某些运行中得到正确的结果?
    • @Tzach 我觉得这里的第二种情况会产生相同的结果。如果在最后一步 val result = f(intermediate2, intermediate1) 发生这将导致 //(aa, 2), (cc, 1),它将给出不同的结果。如果我在这里错了,请告诉我
    • 是不是当intermediate1(a say in partition)和intermediate2(b say in partition)到达驱动程序时,它们可能会被洗牌。中间 1 变成 b 中间 2 变成 a ?因此而不是 val result = f(intermediate1, intermediate2) 它变成 val result = f(intermediate2, intermediate1) 因此 1+1 ==2 而不是 2+1==3
    • 两个 cmets 都是正确的 - 对交换性和关联性的需求源于不能保证顺序的事实。我会相应地更新答案 - 搞砸了这个例子:)
    【解决方案2】:

    函数 (a , b) => (a + 1) 在本质上无法关联。 结合律说,​​

    f(a ,f(b , c)) = f(f(a , b), c) 
    

    假设以下键:

    a = (x, 1)
    b = (x, 1)
    c = (x, 1)
    

    应用函数 (a , b) => (a + 1)

    f(a ,f(b , c)) = (x , 2)
    

    但是,

    f(f(a , b), c) = (x , 3)
    

    因此,它不是关联的,不适用于 reduceByKey。

    【讨论】:

      猜你喜欢
      • 2018-01-20
      • 2021-04-02
      • 2011-09-18
      • 1970-01-01
      • 2021-04-20
      • 1970-01-01
      • 1970-01-01
      • 2017-06-10
      • 1970-01-01
      相关资源
      最近更新 更多