- map算子:将集合中的每个元素乘2
package cn.spark.study.core
import org.apache.spark.{SparkConf, SparkContext}
object transformationOpertion {
def main(args: Array[String]): Unit = {
map()
}
def map(): Unit ={
val conf = new SparkConf().setAppName("map").setMaster("local")
val sc = new SparkContext(conf)
val numbers = Array(1,2,3,4,5)
val numberRDD = sc.parallelize(numbers,1)
val pair = numberRDD.map(x => x * 2)
val result = pair.foreach(x => println(x))
}
}
结果:
- filter算子:过滤出集合中的偶数
package cn.spark.study.core import org.apache.spark.{SparkConf, SparkContext} object transformationOpertion { def main(args: Array[String]): Unit = { //map() filter() } def filter(): Unit ={ val conf = new SparkConf().setMaster("local").setAppName("filter") val sc = new SparkContext(conf) val number = Array(1,2,3,4,5,6,7,8,9) val numRDD = sc.parallelize(number) val evenNum = numRDD.filter(x => x % 2 ==0) val result = evenNum.foreach(x => println(x)) } }
测试:
- flatmap:将行拆分为单词
def flatmap(): Unit ={
val conf = new SparkConf().setAppName("flatmap").setMaster("local")
val sc = new SparkContext(conf)
val strs = Array("hello world","ni hao","ha ni shi")
val strRDD = sc.parallelize(strs)
val flat = strRDD.flatMap(x => x.split(" "))
val result = flat.foreach(x => println(x))
}
测试:
- groupByKey:将每个班级的成绩进行分组
def groupBykey(): Unit ={
val conf = new SparkConf().setMaster("local").setAppName("groupBykey")
val sc = new SparkContext(conf)
val scoreList = Array(Tuple2("jike1",99),Tuple2("ruangong1",34),Tuple2("jike1",80),
Tuple2("ruangong1",78))
val scoreRDD = sc.parallelize(scoreList)
val result = scoreRDD.groupByKey()
result.foreach(x => {println(x._1);
x._2.foreach(x => println(x));
println("==========================")}
)
}
}
测试:
- reduceByKey:统计每个班级的总分
def rdcbykey(): Unit ={
val conf = new SparkConf().setMaster("local").setAppName("groupBykey")
val sc = new SparkContext(conf)
val scoreList = Array(Tuple2("jike1",99),Tuple2("ruangong1",34),Tuple2("jike1",80),
Tuple2("ruangong1",78))
val sRDD = sc.parallelize(scoreList)
val result = sRDD.reduceByKey(_ + _)
result.foreach(x => println(x._1 + " : " + x._2))
}
}
测试:
- sortByKey:将学生分数进行排序
def sortByKey() {
val conf = new SparkConf()
.setAppName("sortByKey")
.setMaster("local")
val sc = new SparkContext(conf)
val scoreList = Array(Tuple2(65, "leo"), Tuple2(50, "tom"),
Tuple2(100, "marry"), Tuple2(85, "jack"))
val scores = sc.parallelize(scoreList, 1)
val sortedScores = scores.sortByKey(false)
sortedScores.foreach(studentScore => println(studentScore._1 + ": " + studentScore._2))
}
测试:
- join:打印每个学生的成绩
- cogroup:打印每个学生的成绩
def join() {
val conf = new SparkConf()
.setAppName("join")
.setMaster("local")
val sc = new SparkContext(conf)
val studentList = Array(
Tuple2(1, "leo"),
Tuple2(2, "jack"),
Tuple2(3, "tom"));
val scoreList = Array(
Tuple2(1, 100),
Tuple2(2, 90),
Tuple2(3, 60));
val students = sc.parallelize(studentList);
val scores = sc.parallelize(scoreList);
val studentScores = students.join(scores)
studentScores.foreach(studentScore => {
println("student id: " + studentScore._1);
println("student name: " + studentScore._2._1)
println("student socre: " + studentScore._2._2)
println("=======================================")
})
}
测试: