从数据处理的方式角度:

流式(Streaming)数据处理;

批量(batch)数据处理;

从数据处理延迟的长短:

实时数据处理: 毫秒级别;(流式处理 != 实时数据处理)

离线数据处理: 小时 or 天级别

Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。

Spark Streaming准实时(秒,分钟),微批次(时间)的数据处理框架 

 Spark |05 SparkStreaming

和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream(不连续的流。DStream 是随时间推移而收到的数据的序列。在

内部,每个时间区间收到的数据都作为 RDD存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”)。所以 简单来将,DStream 就是对 RDD 在实时数据处理场景的一种封装。

离线数据:不可改变数据;实时数据:改变对数据;  流式处理批量处理

批量(微批次,不是流式处理)

Spark |05 SparkStreaming

什么是DStream

    DSream 代表了一系列连续的RDD,DStream中每个RDD包含特定时间间隔的数据;离散流, 一个或多个RDD

2. 架构

特点

易用、 容错、 易整合到spark体系

整体架构

   Spark |05 SparkStreaming

 

 SparkStreaming架构

Spark |05 SparkStreaming

背压机制

Spark 1.5 以前版本,用户如果要限制 Receiver 的数据接收速率,可以通过设置静态配制参 数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来

适配当前的处理能力,防止内存溢出,但也会引入其它问题(如果接收数据太快,消费太难,就会有积压;如果接收的很慢,消费的很快,就会资源浪费)。

比如:producer 数据生产高于 maxRate,当 前集群处理能力也高于 maxRate,这就会造成资源利用率下降等问题。 为了更好的协调数据接收速率与资源处理能力,1.5 版本开始

Spark Streaming 可以动态控制 数据接收速率来适配集群数据处理能力。背压机制(即 Spark Streaming Backpressure):根据 JobScheduler 反馈作业的执行信息来动态调整

Receiver 数据接收率。 通过属性“spark.streaming.backpressure.enabled”来控制是否启用 backpressure 机制,默认值 false,即不启用。

WordCount

导入依赖

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming_2.12</artifactId>
   <version>3.0.0</version>
</dependency>

需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

StreamingContext中有这个构造方法: def this(conf: SparkConf, batchDuration: Duration)
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    //第二个参数表示批量处理的周期
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))
    //逻辑处理
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101", 9999)
    val words: DStream[String] = lines.flatMap(_.split(" "))
    val wordToOne: DStream[(String, Int)] = words.map((_, 1))
    val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_+_)
    wordToCount.print()
    //由于SparkStreaming采集器是长期执行的任务,所以不能直接关闭 ssc.stop()
    //如果main方法执行完毕, 应用程序也会自动结束,所以不能让main执行完毕
    //1. 启动采集器
    ssc.start()
    //2. 等待采集器的关闭
    ssc.awaitTermination()
  }
[kris@hadoop101 ~]$ nc -lk 9999
Hello world
Hello 
Hello java
Hello spark

如果程序运行时,log日志太多,可以将spark conf目录下的log4j文件放中resources里边,日志级别改成ERROR

DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据

3. DStream 创建

1 RDD 队列

测试过程中,可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到 这个队列中的 RDD,都会作为一个 DStream 处理。

需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算 WordCoun

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    //第二个参数表示批量处理的周期
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))
    val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()
    val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue, oneAtATime = false)
    val mappedStream: DStream[(Int, Int)] = inputStream.map((_, 1))
    val reduceStream: DStream[(Int, Int)] = mappedStream.reduceByKey(_+_)
    reduceStream.print()
    //1. 启动采集器
    ssc.start()
    for (i <- 1 to 5) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
      Thread.sleep(2000)
    }
    //2. 等待采集器的关闭
    ssc.awaitTermination()
  }
View Code

相关文章: