Spark Streaming介绍:
    基于Spark之上的流处理(rdd)
    流:source ==> compute ==> store
    离线是特殊的流
 
letting you write streaming jobs,the same way you write batch jobs 
 
out of the box  开箱即用 OOTB(内置的)

Steanming、Core 、SQL比较:
Steanming : DStream <=   represents a continuous stream of data
Core:RDD
SQL:  DF/DS

Streaming入口:StreamingContext
Core:SparkContext
SQL: 
    SparkSession
    SQLContext/HiveContext
    
编程模型一 socketTextStream:
import org.apache.spark._
import org.apache.spark.streaming._    
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()             
ssc.awaitTermination()   

该连接模式有receiver,如下图所示:
Spark Streaming介绍及基础操作

由以下提示可知,socket: receiver占用一个core 
18/09/07 22:42:41 WARN StreamingContext: 
spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.

对DStream做一个操作,其实就是对这个DStream底层的所有RDD都做相同的操作

Direct模式:
编程模型二 textFileStream(无Receiver):

import org.apache.spark._
import org.apache.spark.streaming._    
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.textFileStream("/streaming/input/")
val words = lines.flatMap(_.split("\t"))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
Spark Streaming介绍及基础操作

 

编程模型三 CheckPoint&updateStateByKey:
def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
val curr = currentValues.sum
val pre = preValues.getOrElse(0)
Some(curr + pre)
}

import org.apache.spark._
import org.apache.spark.streaming._    
val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint("/streaming/checkpoint/")
val lines = ssc.socketTextStream("hadoop000",9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val result = pairs.updateStateByKey(updateFunction)
result.print()
ssc.start()
ssc.awaitTermination()

打包提交:
./spark-submit \
--master local[2] \
--name StreamingStateApp \
--class com.ruozedata.spark.streaming.day01.StreamingStateApp \
/home/hadoop/lib/g3-spark-1.0.jar

即可累加计算

WAL(WriteAheadLog):
WAL框架及实现

相关文章: