一、概述

Spark Core、Spark-SQL与Spark-Streaming都是相同的,编写好之后打成jar包使用spark-submit命令提交到集群运行应用
$SPARK_HOME/bin#./spark-submit  --master spark://Master01:7077  --class MainClassFullName [--files $HIVE_HOME/conf/hive-site.xml] JarNameFullPath [slices]

说明:
--master参数用于指定提交到的Spark集群入口,这个入口通常是Spark的Master节点(即Master进程或ResourceManager进程所在的节点),如果需要为该参数指定一个高可用集群则集群节点之间使用英文逗号分割
--class参数用于指定Spark之Driver的入口Main类(必须指定该Main类的全名)
如果使用Spark操作Hive仓库则需要使用--files参数指定Hive的配置文件
如果使用Spark操作关系数据库则需要将关系数据库的驱动包放置于Spark安装目录下的library目录下(在Spark2.x中应该放置于jars目录下),如:
[hadoop@CloudDeskTop jars]$ pwd
/software/spark-2.1.1/jars
JarNameFullPath表示的是提交的Spark应用所在的JAR包全名(最好指定为绝对的全路径)
slices:表示的是读取数据的并行度(值为一个数值,根据实际的物理内存配置来指定,内存较小时指定为1或者不用指定),一般在Streaming应用中是不需要指定的

二、Spark之JDBC实战

(一)、本地模式操作

典型业务场景描述:将CloudDeskTop客户端本地的数据,通过Spark处理,然后将结果写入远端关系数据库中,供前端在线事务系统使用

1、在Eclipse4.5中建立工程RDDToJDBC,并创建一个文件夹lib用于放置第三方驱动包

[hadoop@CloudDeskTop software]$ cd /project/RDDToJDBC/
[hadoop@CloudDeskTop RDDToJDBC]$ mkdir -p lib
[hadoop@CloudDeskTop RDDToJDBC]$ ls
bin lib src

2、添加必要的环境

2.1、将MySql的jar包拷贝到工程目录RDDToJDBC下的lib目录下
[hadoop@CloudDeskTop software]$ cp -a /software/hive-1.2.2/lib/mysql-connector-java-3.0.17-ga-bin.jar /project/RDDToJDBC/lib/
2.1、将Spark的开发库Spark2.1.1-All追加到RDDToJDBC工程的classpath路径中去(可以通过添加用户库的方式来解决);Spark2.1.1-All中包含哪些包,请点击此处

3、基于RDD到DB的Java源码

  1 package com.mmzs.bigdata.spark.core.local;
  2 
  3 import java.io.File;
  4 import java.sql.Connection;
  5 import java.sql.DriverManager;
  6 import java.sql.PreparedStatement;
  7 import java.sql.SQLException;
  8 import java.util.Arrays;
  9 import java.util.Iterator;
 10 import java.util.List;
 11 
 12 import org.apache.spark.SparkConf;
 13 import org.apache.spark.api.java.JavaPairRDD;
 14 import org.apache.spark.api.java.JavaRDD;
 15 import org.apache.spark.api.java.JavaSparkContext;
 16 import org.apache.spark.api.java.function.FlatMapFunction;
 17 import org.apache.spark.api.java.function.Function2;
 18 import org.apache.spark.api.java.function.PairFunction;
 19 import org.apache.spark.api.java.function.VoidFunction;
 20 
 21 import scala.Tuple2;
 22 import scala.Tuple4;
 23 
 24 public class RDDToDB {
 25     /**
 26      * 全局计数器
 27      */
 28     private static int count;
 29     
 30     /**
 31      * 数据库连接
 32      */
 33     private static Connection conn;
 34     
 35     /**
 36      * 预编译语句
 37      */
 38     private static PreparedStatement pstat;
 39     
 40     private static final File OUT_PATH=new File("/home/hadoop/test/output");
 41     
 42     static{
 43         delDir(OUT_PATH);
 44         try {
 45             String sql="insert into wordcount(word,count) values(?,?)";
 46             String url="jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8";
 47             Class.forName("com.mysql.jdbc.Driver");
 48             conn=DriverManager.getConnection(url, "root", "123456");
 49             pstat=conn.prepareStatement(sql);
 50         } catch (ClassNotFoundException e) {
 51             e.printStackTrace();
 52         } catch (SQLException e) {
 53             e.printStackTrace();
 54         }
 55     }
 56     /**
 57      * 删除任何目录或文件
 58      * @param f
 59      */
 60     private static void delDir(File f){
 61         if(!f.exists())return;
 62         if(f.isFile()||(f.isDirectory()&&f.listFiles().length==0)){
 63             f.delete();
 64             return;
 65         }
 66         File[] files=f.listFiles();
 67         for(File fp:files)delDir(fp);
 68         f.delete();
 69     }
 70     
 71     //分批存储
 72     private static void batchSave(Tuple2<String, Integer> line,boolean isOver){
 73         try{
 74             pstat.setString(1, line._1());
 75             pstat.setInt(2, line._2());
 76             
 77             if(isOver){//如果结束了循环则直接写磁盘
 78                 pstat.addBatch();
 79                 pstat.executeBatch();
 80                 pstat.clearBatch();
 81                 pstat.clearParameters();
 82             }else{ //如果没有结束则将sql语句添加到批处理中去
 83                 pstat.addBatch();
 84                 count++;
 85                 if(count%100==0){ //如果满一个批次就提交一次批处理操作
 86                     pstat.executeBatch();
 87                     pstat.clearBatch();
 88                     pstat.clearParameters();
 89                 }
 90             }
 91         }catch(SQLException e){
 92             e.printStackTrace();
 93         }
 94     }
 95     
 96     /**
 97      * 将RDD集合中的数据存储到关系数据库MYSql中去
 98      * @param statResRDD
 99      */
100     private static void saveToDB(JavaPairRDD<String, Integer> statResRDD){
101         final long rddNum=statResRDD.count();
102         statResRDD.foreach(new VoidFunction<Tuple2<String,Integer>>(){
103             private long count=0;
104             @Override
105             public void call(Tuple2<String, Integer> line) throws Exception {
106                 if(++count<rddNum){
107                     batchSave(line,false);
108                 }else{
109                     batchSave(line,true);
110                 }
111             }
112         });
113         
114         try{
115             if(null!=pstat)pstat.close();
116             if(null!=conn)conn.close();
117         }catch(SQLException e){
118             e.printStackTrace();
119         }
120     }
121     
122     public static void main(String[] args) {
123         SparkConf conf=new SparkConf();
124         conf.setAppName("Java Spark local");
125         conf.setMaster("local");
126         
127         //根据Spark配置生成Spark上下文
128         JavaSparkContext jsc=new JavaSparkContext(conf);
129         
130         //读取本地的文本文件成内存中的RDD集合对象
131         JavaRDD<String> lineRdd=jsc.textFile("/home/hadoop/test/jdbc");
132         
133         //切分每一行的字串为单词数组,并将字串数组中的单词字串释放到外层的JavaRDD集合中
134         JavaRDD<String> flatMapRdd=lineRdd.flatMap(new FlatMapFunction<String,String>(){
135             @Override
136             public Iterator<String> call(String line) throws Exception {
137                 String[] words=line.split(" ");
138                 List<String> list=Arrays.asList(words);
139                 Iterator<String> its=list.iterator();
140                 return its;
141             }
142         });
143         
144         //为JavaRDD集合中的每一个单词进行计数,将其转换为元组
145         JavaPairRDD<String, Integer> mapRdd=flatMapRdd.mapToPair(new PairFunction<String, String,Integer>(){
146             @Override
147             public Tuple2<String,Integer> call(String word) throws Exception {
148                 return new Tuple2<String,Integer>(word,1);
149             }
150         });
151         
152         //根据元组中的第一个元素(Key)进行分组并统计单词出现的次数
153         JavaPairRDD<String, Integer> reduceRdd=mapRdd.reduceByKey(new Function2<Integer,Integer,Integer>(){
154             @Override
155             public Integer call(Integer pre, Integer next) throws Exception {
156                 return pre+next;
157             }
158         });
159         
160         //将单词元组中的元素反序以方便后续排序
161         JavaPairRDD<Integer, String> mapRdd02=reduceRdd.mapToPair(new PairFunction<Tuple2<String, Integer>,Integer,String>(){
162             @Override
163             public Tuple2<Integer, String> call(Tuple2<String, Integer> wordTuple) throws Exception {
164                 return new Tuple2<Integer,String>(wordTuple._2,wordTuple._1);
165             }
166         });
167         
168         //将JavaRDD集合中的单词按出现次数进行将序排列
169         JavaPairRDD<Integer, String> sortRdd=mapRdd02.sortByKey(false, 1);
170         
171         //排序之后将元组中的顺序换回来
172         JavaPairRDD<String, Integer> mapRdd03=sortRdd.mapToPair(new PairFunction<Tuple2<Integer, String>,String,Integer>(){
173             @Override
174             public Tuple2<String, Integer> call(Tuple2<Integer, String> wordTuple) throws Exception {
175                 return new Tuple2<String, Integer>(wordTuple._2,wordTuple._1);
176             }
177         });
178         
179         //存储统计之后的结果到磁盘文件中去
180         //mapRdd03.saveAsTextFile("/home/hadoop/test/jdbc/output");
181         
182         saveToDB(mapRdd03);
183         
184         //关闭Spark上下文
185         jsc.close();
186     }
187 }
View Code

相关文章:

  • 2021-11-08
  • 2021-09-17
  • 2021-05-19
  • 2021-05-26
  • 2021-12-21
  • 2021-06-08
猜你喜欢
  • 2022-02-12
  • 2021-06-27
  • 2021-12-18
  • 2021-08-29
  • 2021-11-19
  • 2021-05-20
  • 2021-11-18
相关资源
相似解决方案