楔子

了解Spark ,

package cn.zhuzi.spark;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;

import org.apache.commons.io.FileUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

/**
 * @Title: SparkDemoWordCount.java
 * @Package cn.zhuzi.spark
 * @Description: TODO(spark 统计单词)
 * @author 作者 grq
 * @version 创建时间:2018年11月15日 下午2:54:44
 *
 */
public class SparkDemoWordCount {

	public static void main(String[] args) throws IOException {
		String base_path = "E:/had/spark/";
		String inputPath = base_path + "12.txt";
		String outputpath = base_path + "out/a_wc";
		File file = FileUtils.getFile(outputpath);
		if (file.exists()) {
			FileUtils.deleteDirectory(file);
		}
		wordCount(inputPath, outputpath);
	}

	@SuppressWarnings("serial")
	public static <U> void wordCount(String inputPath, String outputpath) {
		// 创建spark context
		JavaSparkContext sc = SparkUtils.getContext();

		// 读取输入的数据
		JavaRDD<String> input = sc.textFile(inputPath);

		// 切分为单词
		JavaRDD<String> words = input.flatMap(new FlatMapFunction<String, String>() {
			public Iterator<String> call(String t) throws Exception {
				Iterator<String> iterator = Arrays.asList(t.split(" ")).iterator();
				return iterator;
			}
		});
		// 转换为键值对并基数
		JavaPairRDD<String, Integer> counts = words.mapToPair(new PairFunction<String, String, Integer>() {
			public Tuple2<String, Integer> call(String t) throws Exception {
				return new Tuple2<String, Integer>(t, 1);
			}
		}).reduceByKey(new Function2<Integer, Integer, Integer>() {
			public Integer call(Integer x, Integer y) throws Exception {
				return x + y;
			}
		});
		// 输出到文件。文件必须不存在
		counts.saveAsTextFile(outputpath);
		sc.close();
	}
}


package cn.zhuzi.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

/**
 * @Title: SparkUtils.java
 * @Package cn.zhuzi.spark
 * @Description: TODO(SparkUtils )
 * @author 作者 grq
 * @version 创建时间:2018年11月15日 下午2:49:33
 *
 */
public class SparkUtils {
	static SparkConf conf;
	static JavaSparkContext sc;
	/**
	 * 初始化sparkContext
	 * 
	 */
	static {
		// TODO 创建SparkContext
		// 的最基本方法,只需要传递2个参数https://www.cnblogs.com/Forever-Road/p/7351245.html
		/**
		 * 1:集群URL 告诉spark如何连接到集群上,这个实例中使用local,这个特殊的值可以让spark运行在
		 * 单机单线程上而不需要连接到集群
		 * <p/>
		 * 2:应用名:当连接到一个集群时,这个值可以帮助你在集群管理器的用户界面中找到你的应用
		 */
		buildContext();
	}

	private static void buildContext() {
		conf = new SparkConf().setMaster("local").setAppName("sparkDemo");
		sc = new JavaSparkContext(conf);
	}

	/**
	 * 关闭spark
	 */
	public static void closeContext() {
		sc.close();
		// 或者
		System.exit(0);
	}

	private SparkUtils() throws Exception {
		throw new Exception("不允许实例化工具类");
	}

	public static JavaSparkContext getContext() {
		if (sc == null) {
			buildContext();
		}
		return sc;
	}
}

Spark版本WordCount

maven

		<!-- spark -->
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.11</artifactId>
			<version>${spark.version}</version>
		</dependency>
		<!-- spark /end -->

相关文章: