Spark Streaming介绍:
基于Spark之上的流处理(rdd)
流:source ==> compute ==> store
离线是特殊的流
letting you write streaming jobs,the same way you write batch jobs
out of the box 开箱即用 OOTB(内置的)
Steanming、Core 、SQL比较:
Steanming : DStream <= represents a continuous stream of data
Core:RDD
SQL: DF/DS
Streaming入口:StreamingContext
Core:SparkContext
SQL:
SparkSession
SQLContext/HiveContext
编程模型一 socketTextStream:
import org.apache.spark._
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
该连接模式有receiver,如下图所示:
由以下提示可知,socket: receiver占用一个core
18/09/07 22:42:41 WARN StreamingContext:
spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
对DStream做一个操作,其实就是对这个DStream底层的所有RDD都做相同的操作
Direct模式:
编程模型二 textFileStream(无Receiver):
import org.apache.spark._
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.textFileStream("/streaming/input/")
val words = lines.flatMap(_.split("\t"))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
编程模型三 CheckPoint&updateStateByKey:
def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
val curr = currentValues.sum
val pre = preValues.getOrElse(0)
Some(curr + pre)
}
import org.apache.spark._
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint("/streaming/checkpoint/")
val lines = ssc.socketTextStream("hadoop000",9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val result = pairs.updateStateByKey(updateFunction)
result.print()
ssc.start()
ssc.awaitTermination()
打包提交:
./spark-submit \
--master local[2] \
--name StreamingStateApp \
--class com.ruozedata.spark.streaming.day01.StreamingStateApp \
/home/hadoop/lib/g3-spark-1.0.jar
即可累加计算
WAL(WriteAheadLog):
WAL框架及实现