【发布时间】:2016-09-06 23:44:08
【问题描述】:
我正在尝试使用 scala 通过 spark 计算每行的迭代次数。
以下是我的输入:
1 维克拉姆
2 萨钦
3 小号
4 好了
5 阿库尔
5 阿库尔
1 维克拉姆
1 维克拉姆
3 小号
10阿舒
5 阿库尔
1 维克拉姆
2 萨钦
7 维克拉姆
现在我创建了 2 个单独的 RDD,如下所示。
val f1 = sc.textFile("hdfs:///path to above data file")
val m1 = f1.map( s => (s.split(" ")(0),1) ) //creating a tuple (key,1)
//now if i create a RDD as
val rd1 = m1.reduceByKey((a,b) => a+b )
rd1.collect().foreach(println)
//I get a proper output i.e (it gives correct output every time)
//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1)
//but if i create a RDD as
val rd2 = m1.reduceByKey((a,b) => a+1 )
rd2.collect().foreach(println)
//I get a inconsistent result i.e some times i get this (WRONG)
//output: (4,1) (2,2) (7,1) (5,2) (3,2) (1,2) (10,1)
//and sometimes I get this as output (CORRECT)
//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1)
我无法理解为什么会发生这种情况以及在哪里使用什么。我也尝试过将 RDD 创建为
val m2 = f1.map(s => (s,1))
val rd3 = m2.reduceByKey((a,b) => a+1 )
// Then also same issue occurs with a+1 but every thing works fine with a+b
【问题讨论】:
标签: scala hadoop apache-spark