workcount
1 package examples.wordcount;
2
3 import org.apache.spark.SparkConf;
4 import org.apache.spark.api.java.JavaSparkContext;
5 import org.apache.spark.api.java.function.Function2;
6 import org.apache.spark.api.java.function.PairFunction;
7 import org.apache.spark.api.java.function.VoidFunction;
8 import scala.Tuple2;
9
10 import java.util.*;
11 public class WordCount {
12 public static void main(String[] args) {
13 SparkConf sparkConf = new SparkConf();
14 sparkConf.setAppName("wordcount").setMaster("local[*]");
15 JavaSparkContext jsc = new JavaSparkContext(sparkConf);
16 List<String> list = Arrays.asList("hello","hello","world");
17 jsc.parallelize(list,2).mapToPair(new PairFunction<String, String, Integer>() {
18 @Override
19 public Tuple2 call(String obj) {
20 return new Tuple2<String,Integer>(obj,new Integer(1));
21 }
22 }).reduceByKey(new Function2<Integer, Integer, Integer>() {
23 @Override
24 public Integer call(Integer i1,Integer i2) {
25 return i1+i2;
26 }
27 }).sortByKey(false).foreach(new VoidFunction<Tuple2<String, Integer>>() {
28 @Override
29 public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
30 System.out.println(stringIntegerTuple2._1+":"+stringIntegerTuple2._2);
31 }
32 });
33 }
34 }
top-k


1 package examples.topk;
2
3 import org.apache.spark.SparkConf;
4 import org.apache.spark.api.java.JavaSparkContext;
5 import org.apache.spark.api.java.function.Function;
6 import org.apache.spark.api.java.function.Function2;
7 import org.apache.spark.api.java.function.PairFunction;
8 import scala.Serializable;
9 import scala.Tuple2;
10
11 import java.util.Arrays;
12 import java.util.Comparator;
13 import java.util.List;
14
15 public class TopK {
16 public static void main(String[] args) {
17 SparkConf sparkConf = new SparkConf();
18 sparkConf.setMaster("local[*]").setAppName("Top-k");
19 JavaSparkContext jsc = new JavaSparkContext(sparkConf);
20 List<String> list = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "a", "b", "c", "d", "e", "f", "a", "b", "c", "d", "e", "a", "b", "c", "d");
21 List<Tuple2<String, Integer>> topList = jsc.parallelize(list, 5).mapToPair(new PairFunction<String, String, Integer>() {
22 @Override
23 public Tuple2 call(String key) {
24 return new Tuple2<String, Integer>(key, new Integer(1));
25 }
26 }).reduceByKey(new Function2<Integer, Integer, Integer>() {
27 @Override
28 public Integer call(Integer integer, Integer integer2) throws Exception {
29 return integer + integer2;
30 }
31 }).filter(new Function<Tuple2<String, Integer>, Boolean>(){
32 @Override
33 public Boolean call(Tuple2<String,Integer> tuple2) {
34 return tuple2._1!=null;
35 }
36
37 }).top(5,new TupleComparator());
38 System.out.println();
39 for (Tuple2<String, Integer> tuple2 : topList) {
40 System.out.println(tuple2._1 + ":" + tuple2._2);
41 }
42 }
43 //top算子和takeOrdered算子的比较器结果刚好相反
44 public static class TupleComparator implements
45 Comparator<Tuple2<String, Integer>>, Serializable {
46
47 private static final long serialVersionUID = 1L;
48
49 @Override
50 public int compare(Tuple2<String, Integer> o1,
51 Tuple2<String, Integer> o2) {
52 return Integer.compare(o1._2(), o2._2());
53 }
54 }
55
56
57 }
top-k
pagerank


1 package examples.pagerank;
2
3 import org.apache.spark.api.java.JavaPairRDD;
4 import org.apache.spark.api.java.JavaSparkContext;
5 import org.apache.spark.api.java.JavaRDD;
6 import org.apache.spark.SparkConf;
7 import org.apache.spark.api.java.function.Function;
8 import org.apache.spark.api.java.function.Function2;
9 import org.apache.spark.api.java.function.PairFunction;
10 import org.apache.spark.api.java.function.PairFlatMapFunction;
11 import scala.Tuple2;
12
13 import java.util.*;
14
15 public class PageRank{
16 public static void main(String[] args){
17 SparkConf conf=new SparkConf();
18 conf.setAppName("pagerank");
19 conf.setMaster("local");
20 conf.set("spark.testing.memory", "500000000");//设置运行内存大小
21 JavaSparkContext sc=new JavaSparkContext(conf);
22 //partitionBy()只对kv RDD起作用, 进行该操作后,将相同key值的数据放到同一机器上,并进行持久化操作,对后续循环中的join操作进行优化,使得省去join操作 shuffle的开销
23 ArrayList<String> list=new ArrayList<String>(4);
24 list.add("A,D");//网页之间的连接关系,A页面链接到网页D
25 list.add("B,A");
26 list.add("C,A,B");
27 list.add("D,A,C");
28 JavaRDD<String> links=sc.parallelize(list);
29 JavaPairRDD<Character,char[]> pairs =links.mapToPair(new PairFunction<String, Character, char[]>() {
30 public Tuple2<Character,char[]> call(String s) {
31 String[] str=s.split(",");
32 char[] ch=new char[str.length];
33 for (int i=0;i<str.length;i++){
34 ch[i]=str[i].charAt(0);
35 }
36 return new Tuple2<Character,char[]>(s.charAt(0),ch );
37 }//将字符串中保存的页面的链接关系map转换成key-values形式,key当前页面,指向的页面集合用数组表示
38 }).cache();//持久化
39 JavaPairRDD<Character,Float> ranks=sc.parallelize(Arrays.asList(\'A\',\'B\',\'C\',\'D\')).mapToPair(new PairFunction<Character, Character, Float>() {
40 public Tuple2<Character,Float> call(Character character) throws Exception {
41 return new Tuple2<Character,Float>(character,new Float(1.0));
42 }//初始化页面权值是1.0
43 });
44 for(int i=0;i<10;i++){
45 JavaPairRDD<Character,Tuple2<char[],Float>> contribs=pairs.join(ranks);
46 JavaPairRDD<Character,Float> con=contribs.flatMapToPair(new PairFlatMapFunction<Tuple2<Character,Tuple2<char[],Float>>,Character,Float>(){
47 public Iterable call(Tuple2<Character,Tuple2<char[],Float>> val) throws Exception{
48 List<Tuple2<Character,Float>> list=new ArrayList<Tuple2<Character, Float>>();
49 Float f=val._2._2;
50 char[] ch=val._2._1();
51 int len=ch.length;
52 for(int i=0;i<len;i++) {
53 Tuple2<Character, Float> map = new Tuple2<Character, Float>(new Character(ch[i]), new Float(f / len));
54 list.add(map);
55 }
56 return list;
57 }
58 });//将每个页面获得其他页面的pagerank值形成键值对的形式
59 ranks=con.reduceByKey(new Function2<Float, Float, Float>() {
60 public Float call(Float a, Float b) { return a + b; }
61 }).mapValues(new Function<Float, Float>() {
62 public Float call(Float a) throws Exception {
63 return new Float(0.15+0.85*a);
64 }
65 });//当前迭代的pagerank计算
66 }
67 List<Tuple2<Character,Float>> retList = ranks.collect();
68 for(Tuple2<Character,Float> tuple2 :retList) {
69 System.out.println(tuple2._1+":"+tuple2._2);
70 }
71 }
72 }
pagerank