对于NetworkInputDStream而言,其实不是真正的流方式,将数据读出来后不是直接去处理,而是先写到blocks中,后面的RDD再从blocks中读取数据继续处理
这就是一个将stream离散化的过程
NetworkInputDStream就是封装了将数据从source中读出来,然后放到blocks里面去的逻辑(Receiver线程)
还需要一个可以管理NetworkInputDStream,以及把NetworkInputDStream.Receiver部署到集群上执行的角色,这个就是NetworkInputTracker
NetworkInputTracker会负责执行一个独立的job,把各个Receiver以RDD的task的形式,分布到各个worknode上去执行

InputDStream

/**
 * This is the abstract base class for all input streams. This class provides methods
 * start() and stop() which is called by Spark Streaming system to start and stop receiving data.
 * Input streams that can generate RDDs from new data by running a service/thread only on
 * the driver node (that is, without running a receiver on worker nodes), can be
 * implemented by directly inheriting this InputDStream. For example,
 * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for
 * new files and generates RDDs with the new files. For implementing input streams
 * that requires running a receiver on the worker nodes, use
 * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] as the parent class.
 *
 * @param ssc_ Streaming context that will execute this input stream
 */
abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
  extends DStream[T](ssc_) {

  private[streaming] var lastValidTime: Time = null

  ssc.graph.addInputStream(this) // 首先将InputStream加入graph中

  override def dependencies = List()

  override def slideDuration: Duration = {
    if (ssc == null) throw new Exception("ssc is null")
    if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
    ssc.graph.batchDuration
  }

  /** Method called to start receiving data. Subclasses must implement this method. */
  def start()

  /** Method called to stop receiving data. Subclasses must implement this method. */
  def stop()
}

 

NetworkInputDStream

NetworkInputDStream是比较典型的Input,主要接口两个
getReceiver,Receiver对于NetworkInputDStream是最关键的,里面封装了如果从数据源读到数据,如果切分并写到blocks中去
compute,由于Receiver只会把数据写到blocks中去,问题我们如何取到这些数据了?
Receiver在写block的同时,会发event给networkInputTracker注册block
所以NetworkInputDStream.compute是无法直接算出数据来,而是先从networkInputTracker查询出blockids,并从BlockManager中读出数据

/**
 * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
 * that has to start a receiver on worker nodes to receive external data.
 * Specific implementations of NetworkInputDStream must
 * define the getReceiver() function that gets the receiver object of type
 * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent
 * to the workers to receive data.
 * @param ssc_ Streaming context that will execute this input stream
 * @tparam T Class type of the object of this stream
 */
abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
  extends InputDStream[T](ssc_) {

  // This is an unique identifier that is used to match the network receiver with the
  // corresponding network input stream.
  val id = ssc.getNewNetworkStreamId() // network stream id

  /**
   * Gets the receiver object that will be sent to the worker nodes
   * to receive data. This method needs to defined by any specific implementation
   * of a NetworkInputDStream.
   */
  def getReceiver(): NetworkReceiver[T]

  override def compute(validTime: Time): Option[RDD[T]] = {
    // If this is called for any time before the start time of the context,
    // then this returns an empty RDD. This may happen when recovering from a
    // master failure
    if (validTime >= graph.startTime) {
      val blockIds = ssc.scheduler.networkInputTracker.getBlockIds(id, validTime)  // 从networkInputTracker中查询blockids
      Some(new BlockRDD[T](ssc.sc, blockIds))
    } else {
      Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
    }
  }
}


NetworkReceiver

private[streaming] sealed trait NetworkReceiverMessage
private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any)
  extends NetworkReceiverMessage
private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage

/**
 * Abstract class of a receiver that can be run on worker nodes to receive external data. See
 * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] for an explanation.
 */
abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging {

  lazy protected val env = SparkEnv.get

  lazy protected val actor = env.actorSystem.actorOf(  // 创建NetworkReceiverActor(lazy),用于和networkInputTracker通信
    Props(new NetworkReceiverActor()), "NetworkReceiver-" + streamId)

  lazy protected val receivingThread = Thread.currentThread()

  protected var streamId: Int = -1

  /**
   * This method will be called to start receiving data. All your receiver
   * starting code should be implemented by defining this function.
   */
  protected def onStart()

  /** This method will be called to stop receiving data. */
  protected def onStop()

  /** Conveys a placement preference (hostname) for this receiver. */
  def getLocationPreference() : Option[String] = None

  /**
   * Starts the receiver. First is accesses all the lazy members to
   * materialize them. Then it calls the user-defined onStart() method to start
   * other threads, etc required to receiver the data.
   */
  def start() {
    try {
      // Access the lazy vals to materialize them
      env
      actor
      receivingThread

      // Call user-defined onStart()
      onStart()
    } catch {
      case ie: InterruptedException =>
        logInfo("Receiving thread interrupted")
        //println("Receiving thread interrupted")
      case e: Exception =>
        stopOnError(e)
    }
  }

  /**
   * Stops the receiver. First it interrupts the main receiving thread,
   * that is, the thread that called receiver.start(). Then it calls the user-defined
   * onStop() method to stop other threads and/or do cleanup.
   */
  def stop() {
    receivingThread.interrupt()
    onStop()
    //TODO: terminate the actor
  }

  /**
   * Stops the receiver and reports exception to the tracker.
   * This should be called whenever an exception is to be handled on any thread
   * of the receiver.
   */
  protected def stopOnError(e: Exception) {
    logError("Error receiving data", e)
    stop()
    actor ! ReportError(e.toString)
  }


  /**
   * Pushes a block (as an ArrayBuffer filled with data) into the block manager.
   */
  def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
    env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
    actor ! ReportBlock(blockId, metadata)
  }

  /**
   * Pushes a block (as bytes) into the block manager.
   */
  def pushBlock(blockId: BlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) {
    env.blockManager.putBytes(blockId, bytes, level)
    actor ! ReportBlock(blockId, metadata)
  }
}

NetworkReceiverActor
用于将Receiver的event转发给TrackerActor

/** A helper actor that communicates with the NetworkInputTracker */
  private class NetworkReceiverActor extends Actor {
    logInfo("Attempting to register with tracker")
    val ip = env.conf.get("spark.driver.host", "localhost")
    val port = env.conf.getInt("spark.driver.port", 7077)
    val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
    val tracker = env.actorSystem.actorSelection(url)
    val timeout = 5.seconds

    override def preStart() {
      val future = tracker.ask(RegisterReceiver(streamId, self))(timeout)
      Await.result(future, timeout)
    }

    override def receive() = {
      case ReportBlock(blockId, metadata) =>
        tracker ! AddBlocks(streamId, Array(blockId), metadata)
      case ReportError(msg) =>
        tracker ! DeregisterReceiver(streamId, msg)
      case StopReceiver(msg) =>
        stop()
        tracker ! DeregisterReceiver(streamId, msg)
    }
  }

  protected[streaming] def setStreamId(id: Int) {
    streamId = id
  }

BlockGenerator
3个关键的接口,
+=,用于调用者将数据不断加到currentBuffer上
updateCurrentBuffer,定时将currentBuffer的数据,生成block对象放到blocksForPushing队列上(blockIntervalTimer调用)
keepPushingBlocks, 不断将
blocksForPushing队列上的blocks取出,并写到blockmanager中去(blockPushingThread调用)

/**
   * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts
   * them into appropriately named blocks at regular intervals. This class starts two threads,
   * one to periodically start a new batch and prepare the previous batch of as a block,
   * the other to push the blocks into the block manager.
   */
  class BlockGenerator(storageLevel: StorageLevel)
    extends Serializable with Logging {

    case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)

    val clock = new SystemClock()
    val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200)
    val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
    val blockStorageLevel = storageLevel
    val blocksForPushing = new ArrayBlockingQueue[Block](1000)
    val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }

    var currentBuffer = new ArrayBuffer[T]

    def start() {
      blockIntervalTimer.start()
      blockPushingThread.start()
      logInfo("Data handler started")
    }

    def stop() {
      blockIntervalTimer.stop()
      blockPushingThread.interrupt()
      logInfo("Data handler stopped")
    }

    def += (obj: T): Unit = synchronized {
      currentBuffer += obj
    }

    private def updateCurrentBuffer(time: Long): Unit = synchronized {
      try {
        val newBlockBuffer = currentBuffer
        currentBuffer = new ArrayBuffer[T]
        if (newBlockBuffer.size > 0) {
          val blockId = StreamBlockId(NetworkReceiver.this.streamId, time - blockInterval)
          val newBlock = new Block(blockId, newBlockBuffer)
          blocksForPushing.add(newBlock)
        }
      } catch {
        case ie: InterruptedException =>
          logInfo("Block interval timer thread interrupted")
        case e: Exception =>
          NetworkReceiver.this.stop()
      }
    }

    private def keepPushingBlocks() {
      logInfo("Block pushing thread started")
      try {
        while(true) {
          val block = blocksForPushing.take()
          NetworkReceiver.this.pushBlock(block.id, block.buffer, block.metadata, storageLevel)
        }
      } catch {
        case ie: InterruptedException =>
          logInfo("Block pushing thread interrupted")
        case e: Exception =>
          NetworkReceiver.this.stop()
      }
    }
  }

 

SocketInputDStream

Socket作为最为典型的NetworkInputDStream,看看是如何实现的
对于SocketInputDStream,关键实现getReceiver接口,可以获取SocketReceiver对象
而对于SocketReceiver关键是实现onStart接口,将从socket上读到的数据写到blockGenerator的currentBuffer上

private[streaming]
class SocketInputDStream[T: ClassTag](
    @transient ssc_ : StreamingContext,
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends NetworkInputDStream[T](ssc_) {

  def getReceiver(): NetworkReceiver[T] = {  //关键是实现getReceiver接口
    new SocketReceiver(host, port, bytesToObjects, storageLevel)
  }
}

private[streaming]
class SocketReceiver[T: ClassTag](
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends NetworkReceiver[T] {

  lazy protected val blockGenerator = new BlockGenerator(storageLevel) // 创建BlockGenerator

  override def getLocationPreference = None

  protected def onStart() {
    val socket = new Socket(host, port)
    blockGenerator.start()
    val iterator = bytesToObjects(socket.getInputStream())
    while(iterator.hasNext) {
      val obj = iterator.next
      blockGenerator += obj // 核心逻辑就是将从socket上读到的数据写到blockGenerator的currentBuffer上
    }
  }

  protected def onStop() {
    blockGenerator.stop()
  }

}

 

NetworkInputTracker

NetworkInputTracker用于管理和监控所有的NetworkInputDStream
首先NetworkInputTrackerActor,可以从NetworkInputDStream接收RegisterReceiver,AddBlocks,和DeregisterReceiver事件
从而知道有多少NetworkInputDStream,并且每个读取并存储了多少的blocks

再者,在ReceiverExecutor中他负责启动所有NetworkInputDStream的Receivers,做法比较奇特,也是依赖于RDD
将每个receiver封装在RDD的一个partition里,partition会作为一个task被调度,最后runjob去执行startReceiver,这样每个receiver都会在task被执行的时候start

而外部通过getBlockIds,来取得某NetworkInputDStream所有的blockids,从而取到数据

//定义Tracker可能从receiver收到的event类型 
private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef)
  extends NetworkInputTrackerMessage
private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any)
  extends NetworkInputTrackerMessage
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
  extends NetworkInputTrackerMessage

/**
 * This class manages the execution of the receivers of NetworkInputDStreams. Instance of
 * this class must be created after all input streams have been added and StreamingContext.start()
 * has been called because it needs the final set of input streams at the time of instantiation.
 */
private[streaming]
class NetworkInputTracker(ssc: StreamingContext) extends Logging {

  val networkInputStreams = ssc.graph.getNetworkInputStreams() //获取所有的networkInputStreams 
  val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*)
  val receiverExecutor = new ReceiverExecutor()
  val receiverInfo = new HashMap[Int, ActorRef]    //用于记录所有receivers的信息
  val receivedBlockIds = new HashMap[Int, Queue[BlockId]]  //用于记录每个InputDStream接受到的blockids
  val timeout = AkkaUtils.askTimeout(ssc.conf)


  // actor is created when generator starts.
  // This not being null means the tracker has been started and not stopped
  var actor: ActorRef = null
  var currentTime: Time = null

  /** Start the actor and receiver execution thread. */
  def start() {
    if (!networkInputStreams.isEmpty) {
      actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor),  // 创建NetworkInputTrackerActor,用于和receivers通信
        "NetworkInputTracker")
      receiverExecutor.start()  // 启动receiverExecutor
    }
  }

  /** Stop the receiver execution thread. */
  def stop() {
    if (!networkInputStreams.isEmpty && actor != null) {
      receiverExecutor.interrupt()
      receiverExecutor.stopReceivers()
      ssc.env.actorSystem.stop(actor)
      logInfo("NetworkInputTracker stopped")
    }
  }

  /** Return all the blocks received from a receiver. */
  def getBlockIds(receiverId: Int, time: Time): Array[BlockId] = synchronized { //用于获取某个InputDStream相关的blockids
    val queue =  receivedBlockIds.synchronized {
      receivedBlockIds.getOrElse(receiverId, new Queue[BlockId]())
    }
    val result = queue.synchronized {
      queue.dequeueAll(x => true)
    }
    logInfo("Stream " + receiverId + " received " + result.size + " blocks")
    result.toArray
  }

  /** Actor to receive messages from the receivers. */
  private class NetworkInputTrackerActor extends Actor {
    def receive = {
      case RegisterReceiver(streamId, receiverActor) => { // Receiver向traker发送的register事件
        receiverInfo += ((streamId, receiverActor))  // 将该Receiver加入receiverInfo
        sender ! true
      }
      case AddBlocks(streamId, blockIds, metadata) => {
        val tmp = receivedBlockIds.synchronized {
          if (!receivedBlockIds.contains(streamId)) {
            receivedBlockIds += ((streamId, new Queue[BlockId]))  // Receiver通知tracker接受到新的block
          }
          receivedBlockIds(streamId)
        }
        tmp.synchronized {
          tmp ++= blockIds
        }
        networkInputStreamMap(streamId).addMetadata(metadata)
      }
      case DeregisterReceiver(streamId, msg) => {  // Receiver取消注册
        receiverInfo -= streamId
      }
    }
  }

  /** This thread class runs all the receivers on the cluster.  */
  class ReceiverExecutor extends Thread {
    val env = ssc.env

    override def run() {
      try {
        SparkEnv.set(env)
        startReceivers()  //启动所有的Receivers
      } catch {
        case ie: InterruptedException => logInfo("ReceiverExecutor interrupted")
      } finally {
        stopReceivers()
      }
    }

    /**
     * Get the receivers from the NetworkInputDStreams, distributes them to the
     * worker nodes as a parallel collection, and runs them.
     */
    def startReceivers() {
      val receivers = networkInputStreams.map(nis => { //取出所有networkInputStreams的receivers
        val rcvr = nis.getReceiver()
        rcvr.setStreamId(nis.id)
        rcvr
      })

      // Right now, we only honor preferences if all receivers have them
      val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined)  //看看是否有LocationPreferences
        .reduce(_ && _)

      // Create the parallel collection of receivers to distributed them on the worker nodes
      val tempRDD =
        if (hasLocationPreferences) {
          val receiversWithPreferences =
            receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
          ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences)
        }
        else {
          ssc.sc.makeRDD(receivers, receivers.size) //makeRDD,使用ParallelCollectionRDD,这里其实就是将每个receiver封装成RDD的一个partition(task)
        }

      // Function to start the receiver on the worker node
      val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => {
        if (!iterator.hasNext) {
          throw new Exception("Could not start receiver as details not found.")
        }
        iterator.next().start() //启动receiver
      }
      // Run the dummy Spark job to ensure that all slaves have registered.
      // This avoids all the receivers to be scheduled on the same node.
      if (!ssc.sparkContext.isLocal) {
        ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
      }

      // Distribute the receivers and start them
      ssc.sparkContext.runJob(tempRDD, startReceiver)  //被封装成task的receiver会在workernode上调用startReceiver,而startReceiver最终调用receiver.start()
    }

    /** Stops the receivers. */
    def stopReceivers() {
      // Signal the receivers to stop
      receiverInfo.values.foreach(_ ! StopReceiver)
    }
  }
}

相关文章: