1 通信业务逻辑

定义2个类 Master, Worker。首先启动Master,然后启动Worker

  1. Worker 启动后,在 preStart 方法中与 Master 建立连接,向 Master 发送注册,将 Worker 的信息通过 case class 封装起来发送给 Master.
  2. Master 接收 Worker 的注册消息后,将Worker 的信息保存起来。然后向Worker反馈注册成功。
  3. Worker 定期向 Master 发送心跳
  4. Master 会定时处理超时的 Worker

2 源码

2.1 WorkerInfo.scala

package cn.tzb.rpc

class WorkerInfo(val id: String, val memory: Int, val cores: Int) {
  //TODO 上一次心跳
  var lastHeartbeatTime: Long = _

}

2.2 RemoteMessage.scala

package cn.tzb.rpc

trait RemoteMessage extends Serializable

// Worker -> Master
case class RegisterWorker(id: String, memory: Int, cores: Int) extends RemoteMessage
case class Heartbeat(val id: String) extends RemoteMessage

//Master -> Worker
case class RegisteredWorker(masterUrl: String) extends RemoteMessage

//Worker -> self
case object SendHeartbeat

//Master -> self
case object CheckTimeOutWorker

2.3 Master.scala

package cn.tzb.rpc

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.collection.mutable
import scala.concurrent.duration._

class Master(val host: String, val port: Int) extends Actor {

  //id -> WorkerInfo
  val idToWorker = new mutable.HashMap[String, WorkerInfo]()


  val workers = new mutable.HashSet[WorkerInfo]()

  //超时检测的间隔
  val CHECK_INTERVAL = 15000

  override def preStart(): Unit = {
    println("preStart invoked !")

    //导入隐式转换
    import context.dispatcher
    context.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker)
  }

  override def receive: Receive = {

    case RegisterWorker(id, memory, cores) => {
      //判断是否注册过
      if (!idToWorker.contains(id)) {
        //封装 Worker信息保存到内存
        val workerInfo = new WorkerInfo(id, memory, cores)
        idToWorker(id) = workerInfo
        workers += workerInfo
        sender ! RegisteredWorker(s"akka.tcp://[email protected]$host:$port/user/Master")
      }
    }

    case Heartbeat(id) => {
      if (idToWorker.contains(id)) {
        val workerInfo = idToWorker(id)
        //报活
        val currentTime = System.currentTimeMillis()
        workerInfo.lastHeartbeatTime = currentTime
      }
    }

    case CheckTimeOutWorker => {
      val currentTime = System.currentTimeMillis()
      val toRemove = workers.filter(x => currentTime - x.lastHeartbeatTime > CHECK_INTERVAL)
      for (w <- toRemove) {
        workers -= w
        idToWorker -= w.id
      }
      println(workers.size)
    }
  }

}

object Master {
  def main(args: Array[String]): Unit = {

    val host = args(0)
    val port = args(1).toInt

    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin

    val config = ConfigFactory.parseString(configStr)

    //    ActorSystem 老大, 辅助创建和监控下面的 Actor, 是单例的
    val actorSystem = ActorSystem("MasterSystem", config)

    //创建 Actor
    val master = actorSystem.actorOf(Props(new Master(host, port)), "Master")

    actorSystem.awaitTermination()

  }
}

Scala学习笔记(11)—— RPC 通信框架

2.4 Worker.scala

package cn.tzb.rpc

import java.util.UUID

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._

class Worker(val masterHost: String, val masterPort: Int, val memory: Int, val cores: Int) extends Actor {

  var master: ActorSelection = _
  val workerId = UUID.randomUUID().toString

  val HEART_INTERVAL = 10000

  //建立连接
  override def preStart(): Unit = {
    //与Master 建立连接
    master = context.actorSelection(s"akka.tcp://[email protected]$masterHost:$masterPort/user/Master")

    //向 Master 发送注册信息
    master ! RegisterWorker(workerId, memory, cores)
  }

  override def receive: Receive = {
    case RegisteredWorker(masterUrl) => {
      println(masterUrl)
      //发送心跳,启动定时器,

      //导入隐式转换
      import context.dispatcher
      context.system.scheduler.schedule(0 millis, HEART_INTERVAL millis, self, SendHeartbeat)
    }

    case SendHeartbeat => {
      println("send heartbeat to master")
      master ! Heartbeat(workerId)
    }
  }
}

object Worker {
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt

    val masterHost = args(2)
    val masterPort = args(3).toInt

    val memory = args(4).toInt
    val cores = args(5).toInt

    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin

    val config = ConfigFactory.parseString(configStr)

    val actorSystem = ActorSystem("WorkerSystem", config)
    actorSystem.actorOf(Props(new Worker(masterHost, masterPort,memory,cores)), "Worker")
    actorSystem.awaitTermination()


  }

}

Scala学习笔记(11)—— RPC 通信框架

2.5 运行结果

Master
Scala学习笔记(11)—— RPC 通信框架

Worker
Scala学习笔记(11)—— RPC 通信框架

相关文章: