一、概述
Spark Core、Spark-SQL与Spark-Streaming都是相同的,编写好之后打成jar包使用spark-submit命令提交到集群运行应用
$SPARK_HOME/bin#./spark-submit --master spark://Master01:7077 --class MainClassFullName [--files $HIVE_HOME/conf/hive-site.xml] JarNameFullPath [slices]
说明:
--master参数用于指定提交到的Spark集群入口,这个入口通常是Spark的Master节点(即Master进程或ResourceManager进程所在的节点),如果需要为该参数指定一个高可用集群则集群节点之间使用英文逗号分割
--class参数用于指定Spark之Driver的入口Main类(必须指定该Main类的全名)
如果使用Spark操作Hive仓库则需要使用--files参数指定Hive的配置文件
如果使用Spark操作关系数据库则需要将关系数据库的驱动包放置于Spark安装目录下的library目录下(在Spark2.x中应该放置于jars目录下),如:
[hadoop@CloudDeskTop jars]$ pwd
/software/spark-2.1.1/jars
JarNameFullPath表示的是提交的Spark应用所在的JAR包全名(最好指定为绝对的全路径)
slices:表示的是读取数据的并行度(值为一个数值,根据实际的物理内存配置来指定,内存较小时指定为1或者不用指定),一般在Streaming应用中是不需要指定的
二、Spark之JDBC实战
(一)、本地模式操作
典型业务场景描述:将CloudDeskTop客户端本地的数据,通过Spark处理,然后将结果写入远端关系数据库中,供前端在线事务系统使用
1、在Eclipse4.5中建立工程RDDToJDBC,并创建一个文件夹lib用于放置第三方驱动包
[hadoop@CloudDeskTop software]$ cd /project/RDDToJDBC/
[hadoop@CloudDeskTop RDDToJDBC]$ mkdir -p lib
[hadoop@CloudDeskTop RDDToJDBC]$ ls
bin lib src
2、添加必要的环境
2.1、将MySql的jar包拷贝到工程目录RDDToJDBC下的lib目录下
[hadoop@CloudDeskTop software]$ cp -a /software/hive-1.2.2/lib/mysql-connector-java-3.0.17-ga-bin.jar /project/RDDToJDBC/lib/
2.1、将Spark的开发库Spark2.1.1-All追加到RDDToJDBC工程的classpath路径中去(可以通过添加用户库的方式来解决);Spark2.1.1-All中包含哪些包,请点击此处
3、基于RDD到DB的Java源码
1 package com.mmzs.bigdata.spark.core.local; 2 3 import java.io.File; 4 import java.sql.Connection; 5 import java.sql.DriverManager; 6 import java.sql.PreparedStatement; 7 import java.sql.SQLException; 8 import java.util.Arrays; 9 import java.util.Iterator; 10 import java.util.List; 11 12 import org.apache.spark.SparkConf; 13 import org.apache.spark.api.java.JavaPairRDD; 14 import org.apache.spark.api.java.JavaRDD; 15 import org.apache.spark.api.java.JavaSparkContext; 16 import org.apache.spark.api.java.function.FlatMapFunction; 17 import org.apache.spark.api.java.function.Function2; 18 import org.apache.spark.api.java.function.PairFunction; 19 import org.apache.spark.api.java.function.VoidFunction; 20 21 import scala.Tuple2; 22 import scala.Tuple4; 23 24 public class RDDToDB { 25 /** 26 * 全局计数器 27 */ 28 private static int count; 29 30 /** 31 * 数据库连接 32 */ 33 private static Connection conn; 34 35 /** 36 * 预编译语句 37 */ 38 private static PreparedStatement pstat; 39 40 private static final File OUT_PATH=new File("/home/hadoop/test/output"); 41 42 static{ 43 delDir(OUT_PATH); 44 try { 45 String sql="insert into wordcount(word,count) values(?,?)"; 46 String url="jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8"; 47 Class.forName("com.mysql.jdbc.Driver"); 48 conn=DriverManager.getConnection(url, "root", "123456"); 49 pstat=conn.prepareStatement(sql); 50 } catch (ClassNotFoundException e) { 51 e.printStackTrace(); 52 } catch (SQLException e) { 53 e.printStackTrace(); 54 } 55 } 56 /** 57 * 删除任何目录或文件 58 * @param f 59 */ 60 private static void delDir(File f){ 61 if(!f.exists())return; 62 if(f.isFile()||(f.isDirectory()&&f.listFiles().length==0)){ 63 f.delete(); 64 return; 65 } 66 File[] files=f.listFiles(); 67 for(File fp:files)delDir(fp); 68 f.delete(); 69 } 70 71 //分批存储 72 private static void batchSave(Tuple2<String, Integer> line,boolean isOver){ 73 try{ 74 pstat.setString(1, line._1()); 75 pstat.setInt(2, line._2()); 76 77 if(isOver){//如果结束了循环则直接写磁盘 78 pstat.addBatch(); 79 pstat.executeBatch(); 80 pstat.clearBatch(); 81 pstat.clearParameters(); 82 }else{ //如果没有结束则将sql语句添加到批处理中去 83 pstat.addBatch(); 84 count++; 85 if(count%100==0){ //如果满一个批次就提交一次批处理操作 86 pstat.executeBatch(); 87 pstat.clearBatch(); 88 pstat.clearParameters(); 89 } 90 } 91 }catch(SQLException e){ 92 e.printStackTrace(); 93 } 94 } 95 96 /** 97 * 将RDD集合中的数据存储到关系数据库MYSql中去 98 * @param statResRDD 99 */ 100 private static void saveToDB(JavaPairRDD<String, Integer> statResRDD){ 101 final long rddNum=statResRDD.count(); 102 statResRDD.foreach(new VoidFunction<Tuple2<String,Integer>>(){ 103 private long count=0; 104 @Override 105 public void call(Tuple2<String, Integer> line) throws Exception { 106 if(++count<rddNum){ 107 batchSave(line,false); 108 }else{ 109 batchSave(line,true); 110 } 111 } 112 }); 113 114 try{ 115 if(null!=pstat)pstat.close(); 116 if(null!=conn)conn.close(); 117 }catch(SQLException e){ 118 e.printStackTrace(); 119 } 120 } 121 122 public static void main(String[] args) { 123 SparkConf conf=new SparkConf(); 124 conf.setAppName("Java Spark local"); 125 conf.setMaster("local"); 126 127 //根据Spark配置生成Spark上下文 128 JavaSparkContext jsc=new JavaSparkContext(conf); 129 130 //读取本地的文本文件成内存中的RDD集合对象 131 JavaRDD<String> lineRdd=jsc.textFile("/home/hadoop/test/jdbc"); 132 133 //切分每一行的字串为单词数组,并将字串数组中的单词字串释放到外层的JavaRDD集合中 134 JavaRDD<String> flatMapRdd=lineRdd.flatMap(new FlatMapFunction<String,String>(){ 135 @Override 136 public Iterator<String> call(String line) throws Exception { 137 String[] words=line.split(" "); 138 List<String> list=Arrays.asList(words); 139 Iterator<String> its=list.iterator(); 140 return its; 141 } 142 }); 143 144 //为JavaRDD集合中的每一个单词进行计数,将其转换为元组 145 JavaPairRDD<String, Integer> mapRdd=flatMapRdd.mapToPair(new PairFunction<String, String,Integer>(){ 146 @Override 147 public Tuple2<String,Integer> call(String word) throws Exception { 148 return new Tuple2<String,Integer>(word,1); 149 } 150 }); 151 152 //根据元组中的第一个元素(Key)进行分组并统计单词出现的次数 153 JavaPairRDD<String, Integer> reduceRdd=mapRdd.reduceByKey(new Function2<Integer,Integer,Integer>(){ 154 @Override 155 public Integer call(Integer pre, Integer next) throws Exception { 156 return pre+next; 157 } 158 }); 159 160 //将单词元组中的元素反序以方便后续排序 161 JavaPairRDD<Integer, String> mapRdd02=reduceRdd.mapToPair(new PairFunction<Tuple2<String, Integer>,Integer,String>(){ 162 @Override 163 public Tuple2<Integer, String> call(Tuple2<String, Integer> wordTuple) throws Exception { 164 return new Tuple2<Integer,String>(wordTuple._2,wordTuple._1); 165 } 166 }); 167 168 //将JavaRDD集合中的单词按出现次数进行将序排列 169 JavaPairRDD<Integer, String> sortRdd=mapRdd02.sortByKey(false, 1); 170 171 //排序之后将元组中的顺序换回来 172 JavaPairRDD<String, Integer> mapRdd03=sortRdd.mapToPair(new PairFunction<Tuple2<Integer, String>,String,Integer>(){ 173 @Override 174 public Tuple2<String, Integer> call(Tuple2<Integer, String> wordTuple) throws Exception { 175 return new Tuple2<String, Integer>(wordTuple._2,wordTuple._1); 176 } 177 }); 178 179 //存储统计之后的结果到磁盘文件中去 180 //mapRdd03.saveAsTextFile("/home/hadoop/test/jdbc/output"); 181 182 saveToDB(mapRdd03); 183 184 //关闭Spark上下文 185 jsc.close(); 186 } 187 }