本次总结图如下
SparkSql可以自定义函数、聚合函数、开窗函数
作用说明:自定义一个函数,并且注册本身,这样就能在SQL语句中使用
使用方式sqlContext.udf().register(函数名,函数(输入,输出),返回类型))
代码
public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("udf").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); List<String> strs = Arrays.asList("yarn", "Marry", "Jack", "To m", "tom"); JavaRDD<String> rdd = sc.parallelize(strs); JavaRDD<Row> rowRdd = rdd.map(new Function<String, Row>() { @Override public Row call(String s) throws Exception { return RowFactory.create(s); } }); List<StructField> structTypes = new ArrayList<>(); structTypes.add(DataTypes.createStructField("name", DataTypes.StringType, true)); StructType structType = DataTypes.createStructType(structTypes); DataFrame df = sqlContext.createDataFrame(rowRdd, structType); df.show(); /** *自定义一个函数,并且注册本身 */ sqlContext.udf().register("strLen", new UDF1<String, Integer>() { @Override public Integer call(String s) throws Exception { return s.length(); } }, DataTypes.IntegerType); df.registerTempTable("nameTB"); sqlContext.sql("select name,strLen(name) from nameTB").show(); sc.stop(); }SparkStreaming:流式处理框架
SparkStreaming vs Storm
1、Storm是纯实时(来一条处理一条)的处理框架,SparkStreamIng是准实时(间隔一秒)的流式处理框架,sotrm处理数据吞吐量不如sparkStreaming高
Storm接收来的数据是一条一条传输到节点执行,SpakStreaming会攥一段的数据,然后传输到节点执行
2、Storm对于事物的支持度要比SparkStreaming要好,对于流式处理框架,事物指的是:数据刚好被执行一次
3、SparkStreaming中可以使用sql来处理数据,Storm不行
SparkStreaming适合做复杂业务逻辑,Storm适合做简单的汇总型的计算
SparkStreaming算子
foreachRDD , transform ,UpdateStateByKey(累加) ,reduceBykeyAndWindow(窗口)
1、foreachRDD和transform能够从DSStream中将RDD抽取出来
2、foreach是一个out operator算子,transform是一个transform算子
3、updateStateByKey为已经存在的key进行state的状态更新
4、reduceBykeyAndWindow:基于滑动窗口的热点搜索词实时统计,会处理一段时间之类的数据,处理的间隔时长数据+处理的间隔时间
重复计算逻辑和图
重复计算问题解决逻辑图
注意点:
1、local的模拟线程数不许大于2、因为一条线程被receiver(接受数据的线程占用了),另外一个线程是job执行
2、Durations时间的设置,就是我们能接受的延迟度、时间间隔要根据集群资源、job执行时间设置,
3、业务逻辑完成,需要有一个output operator类算子(类似action类算子)
4、javaStreamingContext.start()
5、javaStreamingContext.stop() 无参的sotp方法会将sparkContext一同关闭 ,停止后不能再调用start,不关闭使用stop(false)
代码实战:transform 和 updateStateByKey
public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setAppName("tst") /** * 如果携程local,则只会启动一个线程, * 对于sparkStrem,默认会有一个线程, 会一直监听server发送的数据,那么会这用这一个线程, * 每隔5秒发启动一个job线程处理请求过来的数据,无法执行,所以需要设置为local[2],另启动一个线程 */ .setMaster("local[2]"); JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(5)); JavaReceiverInputDStream<String> str = streamingContext.socketTextStream("localhost", 9999); JavaPairDStream<String, Integer> rdd = str.transformToPair(new Function<JavaRDD<String>, JavaPairRDD<String, Integer>>() { @Override public JavaPairRDD<String, Integer> call(JavaRDD<String> stringJavaRDD) throws Exception { return stringJavaRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }).mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s,1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); } }); /** * 多久会将内存中的数据(每一个key所对应的状态)写入到磁盘上一份呢? * 如果你的batch interval小于10s 那么10s会将内存中的数据写入到磁盘一份 * 如果bacth interval 大于10s,那么就以bacth interval为准 */ streamingContext.checkpoint("sscheckpoint01"); JavaPairDStream<String, Integer> keysDS=rdd.updateStateByKey /** * par1: 本次通过key分组后的值集合 * par2: 上一次记录的值 * par3: 返回的值 */ (new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call /** *par1: 本次通过key分组后的值集合 * par2: 上一次记录的值 */ (List<Integer> par1, Optional<Integer> par2) throws Exception { Integer result=par2.isPresent() ? par2.get():0; for(Integer i : par1) result =result+ i; return Optional.of(result); } }); str.print(); System.out.println("------------------------------------"); rdd.print(); System.out.println("------------------------------------"); keysDS.print(); streamingContext.start(); streamingContext.awaitTermination(); }代码实战reduceByWindow算子
public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setAppName("tes").setMaster("local[2]"); JavaStreamingContext spj = new JavaStreamingContext(conf, Durations.seconds(5)); spj.checkpoint("sscheckpoint01"); JavaReceiverInputDStream<String> receiverInputDStream = spj.socketTextStream("localhost", 9999); JavaDStream<String> transform = receiverInputDStream.transform(new Function<JavaRDD<String>, JavaRDD<String>>() { @Override public JavaRDD<String> call(JavaRDD<String> stringJavaRDD) throws Exception { return stringJavaRDD; } }); /** * 每隔10秒,计算最近60秒内的数据,那么这个窗口大小就是60秒,里面有12个rdd,在没有计算之前,这些rdd是不会进行计算的。 * 那么在计算的时候会将这12个rdd聚合起来,然后一起执行reduceByKeyAndWindow 操作 * ,reduceByKeyAndWindow是针对窗口操作的而不是针对DStream操作的。 */ // JavaDStream<String> stringJavaDStream = transform.reduceByWindow(new Function2<String, String, String>() { // @Override // public String call(String s, String s2) throws Exception { // return s + "_" + s2; // } // }, // /** // * 窗口长度 , 窗口滑动时间 // */ // Durations.seconds(15), Durations.seconds(10)); //优化后的代码 JavaDStream<String> stringJavaDStream = transform.reduceByWindow(new Function2<String, String, String>() { @Override /** * 将讲个时间数据累加 */ public String call(String s, String s2) throws Exception { return s + "_" + s2; } }, new Function2<String, String, String>() { @Override /** * 使用优化代码,避免重复计算,现将上一次的计算结果和本次累加, * 然后去掉上一次结算结果头号计算 */ public String call(String s, String s2) throws Exception { return s.replace(s2,""); } }, /** * 窗口长度 , 窗口滑动时间 */ Durations.seconds(15), Durations.seconds(10)); stringJavaDStream.print(); spj.start(); spj.awaitTermination(); }