1. 介绍
Spark Streaming是Spark生态系统中一个重要的框架,建立在Spark Core之上,与Spark SQL、GraphX、MLib相并列。
Spark Streaming是Spark Core的扩展应用,具有可扩展性、高吞吐量、可容错性等特点。
可以监控来自Kafka、Flume、HDFS、Twitter、Socket套接字等数据,通过复杂算法及一系列的计算分析数据,且可将分析结果存入HDFS、数据库或前端页面。
2. 工作原理
Spark的核心是RDD(或DataFrame)、对于Spark Streaming来说,它的核心是DStream。DStream是一系列RDD的集合,DStream可以按照秒数将数据流进行批量划分。
首先从接收到流数据之后,将其划分为多个批次,然后提交给Spark集群进行计算,最后将结果批量输出到HDFS、数据库或前端页面等。
详细可以参考下图:
图1
图2
当启动Spark Streaming应用的时候,首先会在一个节点的Executor上启动一个Receiver接收者,然后当从数据源写入数据的时候会被Recevier接收,接收到数据之后Receiver会将数据Split成多个Block,然后被分到各个节点(Replicate Blocks容灾恢复),然后Receiver向Streaming Context进行块报告,说明数据在哪几个节点的Executor上,接着在一定间隔时间内StreamingContext会将数据处理为RDD,并交给SparkContext划分到各个节点进行并行计算。
StreamingContext中内部定义了SparkContext,可以通过StreamingContext.sparkContetxt进行访问。
3. Spark Streaming Demo
官方给出的例子,是从Socket源端收集数据运行wordcount的案例。具体代码如下:
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object QuickStart { def main(args: Array[String]): Unit = { // 创建StreamingContext,包含2个线程,且批处理间隔为1秒 val conf = new SparkConf().setMaster("local[2]").setAppName("NetWorkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) // 创建DStream,数据源为TCP源 val lines = ssc.socketTextStream("localhost", 9999) // 将每行文本且分为单词 val words = lines.flatMap(_.split(" ")) // 计算单词个数 val pairs = words.map(word => (word, 1)) val wordCount = pairs.reduceByKey(_ + _) wordCount.print() // 执行计算 ssc.start() ssc.awaitTermination() } }
从Spark Streaming初始化的源码中可看到,其初始化有两种方式:
(1) 通过SparkConf来创建:如上例所示
(2) 通过SparkContext创建,可在Spark-Shell命令行中运行
例:在Spark-Shell中运行Spark Streaming,监控HDFS中的某个文件夹的数据传入,并按指定时间间隔统计词频
a. 编写SparkStreamingDemo.scala,并放置在Spark的某驱动节点上。
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ val ssc = new StreamingContext(sc, Seconds(10)) // read data val lines = ssc.textFileStream("/music_logs/tmp/albumRecSta") // process val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate
b. 运行Spark-shell,以本地模式运行:spark-shell --master local[2]
c. 在Spark-Shell中加载代码,执行如下命令:
:load /home/hadoop/test/SparkStreamingDemo.scala
d. 然后上传任意文件到给定的HDFS目录下,通过观察Spark-Shell中的执行结果,可以实时查看Spark Streaming的处理。
4. 流程
通过上述代码,可以看出Spark Streaming的编程步骤:
(1) 初始化StreamingContext
(2) 定义输入源
(3) 准备流计算指令
(4) 利用streamingContext.start()启动接口和处理数据
(5) 处理过程一直持续,知道streamingContext.stop()被调用。
注意:
a. 一旦一个context已经启动,不能有新的流算子建立或加入到上下文中;一旦一个context已经停止,就不能再重新启动
b. 在JVM中,同一时间只能有一个StreamingContext处于活跃状态
c. streamingContext.stop()执行后,其sparkContext对象也会关闭。当stop设置参数为false时,只会关闭streamingContext。
d. 一个SparkContext可以重复利用去创建多个StreamingContext对象,前提条件是前面的StreamingContext在后面的StreamingContext创建之前关闭,且不关闭SparkContext。
5. 模块
(1) DStream
Spark Streaming提供的基本抽象,表示一个连续的工作流。可来自数据源获取、或者输入流通过转换算子生成处理。DStream由一系列连续的RDD组成,每个RDD都包含确定时间间隔内的数据。
上述代码中, flatMap操作应用于lines这个DStream的每个RDD,生成words这个DStream的过程如下:
(2) 输入DStream和Receiver
每个输入流DStream和一个Receiver对象关联,Receiver从源中获取数据,并将数据存入内存中用于处理。
Spark Streaming包含两类数据源:
a. 基本源:可在StreamingContext的API中直接引入,例如:文件系统(textFileStream)、套接字链接(socketTextStream)、Akka的actor等
b. 高级源:包括Kafka、Flume、Kinesis、Twitter等,需要额外的类来使用。如spark-streaming-kafka_2.10、spark-streaming-flume_2.10、spark-streaming-kinesis-asl_2.10、spark-streaming-twitter_2.10、spark-streaming-zeromq_2.10、spark-streaming-mqtt_2.10等
流应用中科创建多个输入DStream来处理多个数据流。将创建多个Receiver同时接收多个数据流。但是Receiver作为长期运行的任务运行在Spark的woker或executor中。因此占用一个核,所以需要考虑为Spark Streaming应用程序分配足够的核(如果本地运行,则为线程)。
注意:
a. 如果分配给应用程序的核数少于或等于输入DStreams或Receivers的数量,系统只能够接收数据而不能处理他们
b. 运行在本地时,只有一个核运行任务。
1) 基本源
a. 文件流:从任何与HDFS API兼容的文件系统中读取数据,创建方式:smc.fileStream[keyClass, valueClass, imputFormatClass](dataDirectory)
Spark Streaming将监控dataDirectory目录,并处理目录下生成的文件(嵌套目录不支持)。要求目录下的文件必须具有相同的数据格式;所有文件必须在dataDirectory目录下创建,文件时自动移动和重命名到目录下;一旦移动,文件必须修改。若文件被持续追加数据,新的数据不会被读取。
文件流不需要运行一个receiver,所以不需要分配核。
b. 自定义actor流:调用smc.actorStream(actorProps, actorName)方法从Akka actors获取数据流
c. RDD队列作为数据流:可调用smc.queueStream(queueOfRDDs)方法基于RDD队列创建DStreams。
代码示例:
a. 自定义Receiver
import java.io.PrintWriter import java.net.ServerSocket import scala.io.Source /** * 创建外部socket端,数据流模式器 */ object StreamingSimulation { def index(n: Int) = scala.util.Random.nextInt(n) def main(args: Array[String]): Unit = { // 调用该模拟器需要三个参数,文件路径、端口号、时间间隔 if(3 != args.length){ System.err.println("Usage: <fileName> <port> <millisecond>") System.exit(1) } // 获取指定文件总行数 val fileName = args(0) val lines = Source.fromFile(fileName).getLines.toList val fileRow = lines.size // 指定监听某端口,但外部程序请求时建立连接 val listener = new ServerSocket(args(1).toInt) while (true){ val socket = listener.accept() new Thread(){ override def run(): Unit = { println("Got client connected from: " + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream, true) while(true){ Thread.sleep(args(2).toLong) // 当该端口接收请求时,随机获取某行数据发送给对方 val content = lines(index(fileRow)) println("-------------------------------------------") println(s"Time: ${System.currentTimeMillis()}") println("-------------------------------------------") println(content) out.write(content + "\n") out.flush() } socket.close() } } } } }