1.  介绍

Spark Streaming是Spark生态系统中一个重要的框架,建立在Spark Core之上,与Spark SQL、GraphX、MLib相并列。

Spark Streaming是Spark Core的扩展应用,具有可扩展性、高吞吐量、可容错性等特点。

可以监控来自Kafka、Flume、HDFS、Twitter、Socket套接字等数据,通过复杂算法及一系列的计算分析数据,且可将分析结果存入HDFS、数据库或前端页面。

Spark Streaming初探 

2. 工作原理

Spark的核心是RDD(或DataFrame)、对于Spark Streaming来说,它的核心是DStream。DStream是一系列RDD的集合,DStream可以按照秒数将数据流进行批量划分。

首先从接收到流数据之后,将其划分为多个批次,然后提交给Spark集群进行计算,最后将结果批量输出到HDFS、数据库或前端页面等。

详细可以参考下图:

Spark Streaming初探

图1

Spark Streaming初探

图2

当启动Spark Streaming应用的时候,首先会在一个节点的Executor上启动一个Receiver接收者,然后当从数据源写入数据的时候会被Recevier接收,接收到数据之后Receiver会将数据Split成多个Block,然后被分到各个节点(Replicate Blocks容灾恢复),然后Receiver向Streaming Context进行块报告,说明数据在哪几个节点的Executor上,接着在一定间隔时间内StreamingContext会将数据处理为RDD,并交给SparkContext划分到各个节点进行并行计算。

StreamingContext中内部定义了SparkContext,可以通过StreamingContext.sparkContetxt进行访问。

3. Spark Streaming Demo

官方给出的例子,是从Socket源端收集数据运行wordcount的案例。具体代码如下:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object QuickStart {
  def main(args: Array[String]): Unit = {
    // 创建StreamingContext,包含2个线程,且批处理间隔为1秒
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetWorkWordCount")
    val ssc = new StreamingContext(conf, Seconds(1))

    // 创建DStream,数据源为TCP源
    val lines = ssc.socketTextStream("localhost", 9999)

    // 将每行文本且分为单词
    val words = lines.flatMap(_.split(" "))

    // 计算单词个数
    val pairs = words.map(word => (word, 1))
    val wordCount = pairs.reduceByKey(_ + _)
    wordCount.print()

    // 执行计算
    ssc.start()
    ssc.awaitTermination()
  }
} 

从Spark Streaming初始化的源码中可看到,其初始化有两种方式:

(1) 通过SparkConf来创建:如上例所示

(2) 通过SparkContext创建,可在Spark-Shell命令行中运行

例:在Spark-Shell中运行Spark Streaming,监控HDFS中的某个文件夹的数据传入,并按指定时间间隔统计词频

a.  编写SparkStreamingDemo.scala,并放置在Spark的某驱动节点上。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

val ssc = new StreamingContext(sc, Seconds(10))

// read data
val lines = ssc.textFileStream("/music_logs/tmp/albumRecSta")

// process
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print()

ssc.start()  // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate 

b. 运行Spark-shell,以本地模式运行:spark-shell --master local[2]

c. 在Spark-Shell中加载代码,执行如下命令:

:load /home/hadoop/test/SparkStreamingDemo.scala 

d. 然后上传任意文件到给定的HDFS目录下,通过观察Spark-Shell中的执行结果,可以实时查看Spark Streaming的处理。

4. 流程

通过上述代码,可以看出Spark Streaming的编程步骤:

(1) 初始化StreamingContext

(2) 定义输入源

(3) 准备流计算指令

(4) 利用streamingContext.start()启动接口和处理数据

(5) 处理过程一直持续,知道streamingContext.stop()被调用。

注意:

a. 一旦一个context已经启动,不能有新的流算子建立或加入到上下文中;一旦一个context已经停止,就不能再重新启动

b. 在JVM中,同一时间只能有一个StreamingContext处于活跃状态

c. streamingContext.stop()执行后,其sparkContext对象也会关闭。当stop设置参数为false时,只会关闭streamingContext。

d. 一个SparkContext可以重复利用去创建多个StreamingContext对象,前提条件是前面的StreamingContext在后面的StreamingContext创建之前关闭,且不关闭SparkContext。

5. 模块

(1) DStream

Spark Streaming提供的基本抽象,表示一个连续的工作流。可来自数据源获取、或者输入流通过转换算子生成处理。DStream由一系列连续的RDD组成,每个RDD都包含确定时间间隔内的数据。

Spark Streaming初探

上述代码中, flatMap操作应用于lines这个DStream的每个RDD,生成words这个DStream的过程如下:

Spark Streaming初探

(2) 输入DStream和Receiver

每个输入流DStream和一个Receiver对象关联,Receiver从源中获取数据,并将数据存入内存中用于处理。

Spark Streaming包含两类数据源:

a. 基本源:可在StreamingContext的API中直接引入,例如:文件系统(textFileStream)、套接字链接(socketTextStream)、Akka的actor等

b. 高级源:包括Kafka、Flume、Kinesis、Twitter等,需要额外的类来使用。如spark-streaming-kafka_2.10、spark-streaming-flume_2.10、spark-streaming-kinesis-asl_2.10、spark-streaming-twitter_2.10、spark-streaming-zeromq_2.10、spark-streaming-mqtt_2.10等

流应用中科创建多个输入DStream来处理多个数据流。将创建多个Receiver同时接收多个数据流。但是Receiver作为长期运行的任务运行在Spark的woker或executor中。因此占用一个核,所以需要考虑为Spark Streaming应用程序分配足够的核(如果本地运行,则为线程)。

注意:

a. 如果分配给应用程序的核数少于或等于输入DStreams或Receivers的数量,系统只能够接收数据而不能处理他们

b. 运行在本地时,只有一个核运行任务。

 1) 基本源

a. 文件流:从任何与HDFS API兼容的文件系统中读取数据,创建方式:smc.fileStream[keyClass, valueClass, imputFormatClass](dataDirectory)

Spark Streaming将监控dataDirectory目录,并处理目录下生成的文件(嵌套目录不支持)。要求目录下的文件必须具有相同的数据格式;所有文件必须在dataDirectory目录下创建,文件时自动移动和重命名到目录下;一旦移动,文件必须修改。若文件被持续追加数据,新的数据不会被读取。

文件流不需要运行一个receiver,所以不需要分配核。

b. 自定义actor流:调用smc.actorStream(actorProps, actorName)方法从Akka actors获取数据流

c. RDD队列作为数据流:可调用smc.queueStream(queueOfRDDs)方法基于RDD队列创建DStreams。

代码示例:

a. 自定义Receiver

import java.io.PrintWriter
import java.net.ServerSocket

import scala.io.Source

/**
  * 创建外部socket端,数据流模式器
  */
object StreamingSimulation {

  def index(n: Int) = scala.util.Random.nextInt(n)

  def main(args: Array[String]): Unit = {
    // 调用该模拟器需要三个参数,文件路径、端口号、时间间隔
    if(3 != args.length){
      System.err.println("Usage: <fileName> <port> <millisecond>")
      System.exit(1)
    }

    // 获取指定文件总行数
    val fileName = args(0)
    val lines = Source.fromFile(fileName).getLines.toList
    val fileRow = lines.size

    // 指定监听某端口,但外部程序请求时建立连接
    val listener = new ServerSocket(args(1).toInt)

    while (true){
      val socket = listener.accept()
      new Thread(){
        override def run(): Unit = {
          println("Got client connected from: " + socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream, true)
          while(true){
            Thread.sleep(args(2).toLong)
            // 当该端口接收请求时,随机获取某行数据发送给对方
            val content = lines(index(fileRow))
            println("-------------------------------------------")
            println(s"Time: ${System.currentTimeMillis()}")
            println("-------------------------------------------")
            println(content)
            out.write(content + "\n")
            out.flush()
          }
          socket.close()
        }
      }
    }
  }
}
View Code

相关文章: