楔子
了解Spark ,
package cn.zhuzi.spark;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.commons.io.FileUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
/**
* @Title: SparkDemoWordCount.java
* @Package cn.zhuzi.spark
* @Description: TODO(spark 统计单词)
* @author 作者 grq
* @version 创建时间:2018年11月15日 下午2:54:44
*
*/
public class SparkDemoWordCount {
public static void main(String[] args) throws IOException {
String base_path = "E:/had/spark/";
String inputPath = base_path + "12.txt";
String outputpath = base_path + "out/a_wc";
File file = FileUtils.getFile(outputpath);
if (file.exists()) {
FileUtils.deleteDirectory(file);
}
wordCount(inputPath, outputpath);
}
@SuppressWarnings("serial")
public static <U> void wordCount(String inputPath, String outputpath) {
// 创建spark context
JavaSparkContext sc = SparkUtils.getContext();
// 读取输入的数据
JavaRDD<String> input = sc.textFile(inputPath);
// 切分为单词
JavaRDD<String> words = input.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String t) throws Exception {
Iterator<String> iterator = Arrays.asList(t.split(" ")).iterator();
return iterator;
}
});
// 转换为键值对并基数
JavaPairRDD<String, Integer> counts = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String t) throws Exception {
return new Tuple2<String, Integer>(t, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer x, Integer y) throws Exception {
return x + y;
}
});
// 输出到文件。文件必须不存在
counts.saveAsTextFile(outputpath);
sc.close();
}
}
package cn.zhuzi.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
/**
* @Title: SparkUtils.java
* @Package cn.zhuzi.spark
* @Description: TODO(SparkUtils )
* @author 作者 grq
* @version 创建时间:2018年11月15日 下午2:49:33
*
*/
public class SparkUtils {
static SparkConf conf;
static JavaSparkContext sc;
/**
* 初始化sparkContext
*
*/
static {
// TODO 创建SparkContext
// 的最基本方法,只需要传递2个参数https://www.cnblogs.com/Forever-Road/p/7351245.html
/**
* 1:集群URL 告诉spark如何连接到集群上,这个实例中使用local,这个特殊的值可以让spark运行在
* 单机单线程上而不需要连接到集群
* <p/>
* 2:应用名:当连接到一个集群时,这个值可以帮助你在集群管理器的用户界面中找到你的应用
*/
buildContext();
}
private static void buildContext() {
conf = new SparkConf().setMaster("local").setAppName("sparkDemo");
sc = new JavaSparkContext(conf);
}
/**
* 关闭spark
*/
public static void closeContext() {
sc.close();
// 或者
System.exit(0);
}
private SparkUtils() throws Exception {
throw new Exception("不允许实例化工具类");
}
public static JavaSparkContext getContext() {
if (sc == null) {
buildContext();
}
return sc;
}
}
maven
<!-- spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark /end -->