Spark编程实战之模拟SparkRPC原理实现自定义RPC详解大数据

1. 什么是RPC   

    RPC(Remote Procedure Call)远程过程调用。在Hadoop和Spark中都使用了PRC,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。简单来说,就是有A、B两台机器,A机器可以调用B机器上的程序。

2. Spark 的RPC

    Master和Worker的启动流程:

    (1) 启动Master,会启动一个定时器,定时检查超时的Worker,并移除超时Worker信息。

    (2) 启动Worker,向Master发送注册信息。

    (3) Master收到Worker发来的注册信息后,保存到内存中,并返回一个响应信息,这个信息就是自己的masterUrl。

    (4) Worker接收到Master发来的响应信息(masterUrl)之后,保存到内存中,并开启一个定时器,定时向Master发送心跳信息。

    (5) Master 不断的接收Worker发来的心跳信息,并将每个Worker的最后一次心跳时间为当前接收到心跳信息的时间。

    流程如下图。

Spark编程实战之模拟SparkRPC原理实现自定义RPC详解大数据

3. 编程实战

3.1 项目代码(Scala语言)

    WorkInfo.scala

package com.nova.rpc 
/** 
  * @author Supernova 
  * @date 2018/06/15 
  */ 
class WorkerInfo(val id: String, val host: String, val port: Int,val memory: Int, val cores: Int) { 
  // 记录最后一次心跳时间 
  var lastHeartbeatTime: Long = _ 
}

    RemoteMsg.scala 

package com.nova.rpc 
 
/** 
  * @author Supernova 
  * @date 2018/06/15 
  */ 
trait RemoteMsg extends Serializable{ 
 
} 
 
// Master 向自己发送检查超时Worker的信息 
case object CheckTimeOutWorker 
 
// Worker向Master发送的注册信息 
case class RegisterWorker(id: String, host: String,port: Int, memory: Int, cores: Int) extends RemoteMsg 
 
// Master向Worker发送的响应信息 
case class RegisteredWorker(masterUrl: String) extends RemoteMsg 
 
// Worker向Master发送的心跳信息 
case class Heartbeat(workerId: String) extends RemoteMsg 
 
// Worker向自己发送的要执行发送心跳信息的消息 
case object SendHeartbeat 

  

Master.scala    

package com.nova.rpc 
 
import akka.actor.{Actor, ActorSystem, Props} 
import com.typesafe.config.{Config, ConfigFactory} 
 
import scala.collection.mutable 
import scala.concurrent.duration._ 
 
/** 
  * @author Supernova 
  * @date 2018/06/15 
  */ 
class Master(val masterHost: String, val masterPort: Int) extends Actor{ 
  // 用来存储Worker的注册信息: <workerId, WorkerInfo> 
  val idToWorker = new mutable.HashMap[String, WorkerInfo]() 
 
  // 用来存储Worker的信息,必须使用可变的HashSet 
  val workers = new mutable.HashSet[WorkerInfo]() 
 
  // Worker的超时时间间隔 
  val checkInterval: Long = 15000 
 
  /** 
    * 重写生命周期preStart方法 
    * 作用:当Master启动时,开启定时器,定时检查超时Worker 
    */ 
  override def preStart(): Unit = { 
    // 启动定时器,定时检查超时的Worker 
    import context.dispatcher 
    context.system.scheduler.schedule(0 millis,checkInterval millis, self,CheckTimeOutWorker) 
  } 
 
  /** 
    *  重写生命周期receive方法 
    *  作用: 
    *  1.接收Worker发来的注册信息 
    *  2.不断接收Worker发来的心跳信息,并更新最后一次心跳时间 
    *  3.过滤出超时的Worker并移除 
    */ 
  override def receive = { 
 
    // 接收Worker给Master发送过来的注册信息 
    case RegisterWorker(id, host, port, memory, cores) => { 
      //判断改Worker是否已经注册过,已注册的不执行任何操作,未注册的将进行注册 
      if (!idToWorker.contains(id)) { 
        val workerInfo = new WorkerInfo(id, host, port, memory, cores) 
 
        idToWorker += (id -> workerInfo) 
        workers += workerInfo 
 
        println("一个新的Worker注册成功") 
 
        //向Worker发送响应信息,将masterUrl返回 
        sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}" + 
          s"@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}") 
      } 
    } 
    //接收Worker发来的心跳信息 
    case Heartbeat(workerId) => { 
      // 通过传输过来的workerId获取对应的WorkerInfo 
      val workerInfo = idToWorker(workerId) 
      // 获取当前时间 
      val currentTime = System.currentTimeMillis() 
      // 更新最后一次心跳时间 
      workerInfo.lastHeartbeatTime = currentTime 
    } 
    //检查超时Worker并移除 
    case CheckTimeOutWorker => { 
      val currentTime = System.currentTimeMillis() 
      // 把超时的Worker过滤出来 
      val toRemove: mutable.HashSet[WorkerInfo] = 
        workers.filter(w => currentTime - w.lastHeartbeatTime > checkInterval) 
      // 将超时的Worker移除 
      toRemove.foreach(deadWorker => { 
        idToWorker -= deadWorker.id 
        workers -= deadWorker 
      }) 
    } 
    println(s"当前Worker的数量: ${workers.size}") 
  } 
} 
 
object Master{ 
  val MASTER_SYSTEM = "MasterSystem" 
  val MASTER_ACTOR = "Master" 
 
  def main(args: Array[String]): Unit = { 
 
    val host = args(0) // 通过main方法参数制定master主机名 
    val port = args(1).toInt //通过main方法参数指定Master的端口号 
 
    //akka配置信息 
    val configStr: String = 
      s""" 
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider" 
         |akka.remote.netty.tcp.hostname = "$host" 
         |akka.remote.netty.tcp.port = "$port" 
      """.stripMargin 
 
    // 配置创建Actor需要的配置信息 
    val config: Config = ConfigFactory.parseString(configStr) 
 
    // 创建ActorSystem 
    val actorSystem: ActorSystem = ActorSystem(MASTER_SYSTEM, config) 
 
    // 用actorSystem实例创建Actor 
    actorSystem.actorOf(Props(new Master(host, port)), MASTER_ACTOR) 
 
    actorSystem.awaitTermination() 
  } 
} 

  

Worker.scala

package com.nova.rpc 
 
import java.util.UUID 
import akka.actor.{Actor, ActorSelection, ActorSystem, Props} 
import com.typesafe.config.{Config, ConfigFactory} 
 
import scala.concurrent.duration._ 
/** 
  * @author Supernova 
  * @date 2018/06/15 
  */ 
class Worker(val host: String, val port: Int, val masterHost: String,val masterPort: Int, val memory: Int, val cores: Int) extends Actor{ 
 
  // 生成一个Worker ID 
  val workerId: String = UUID.randomUUID().toString 
 
  // 用来存储MasterUrl 
  var masterUrl: String = _ 
 
  // 心跳时间间隔 
  val heartbeat_interval: Long = 10000 
 
  // Master的Actor 
  var master: ActorSelection = _ 
 
  /** 
    * 生命周期preStart方法 
    * 作用:当启动Worker时,向master发送注册信息 
    */ 
  override def preStart(): Unit = { 
    // 获取Master的Actor 
    master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}" + 
      s"@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}") 
    master ! RegisterWorker(workerId, host, port, memory, cores) 
  } 
 
  /** 
    * 生命周期receive方法 
    * 作用: 
    * 定时向Master发送心跳信息 
    */ 
  override def receive: Receive = { 
    // Worker接收到Master发送过来的注册成功的信息(masterUrl) 
    case RegisteredWorker(masterUrl) => { 
      this.masterUrl = masterUrl 
      // 启动一个定时器, 定时的给Master发送心跳 
      import context.dispatcher 
      context.system.scheduler.schedule( 
        0 millis, heartbeat_interval millis, self, SendHeartbeat) 
    } 
    case SendHeartbeat => { 
      // 向Master发送心跳信息 
      master ! Heartbeat(workerId) 
    } 
  } 
 
} 
 
object Worker{ 
  val WORKER_SYSTEM = "WorkerSystem" 
  val WORKER_ACTOR = "Worker" 
 
  def main(args: Array[String]): Unit = { 
    /** 
      * 通过main方法参数指定相应的 
      * worker主机名、端口号,master主机名、端口号,使用的内存和核数 
      */ 
    val host = args(0) 
    val port = args(1).toInt 
    val masterHost = args(2) 
    val masterPort = args(3).toInt 
    val memory = args(4).toInt 
    val cores = args(5).toInt 
 
    //akka配置信息 
    val configStr = 
      s""" 
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider" 
         |akka.remote.netty.tcp.hostname = "$host" 
         |akka.remote.netty.tcp.port = "$port" 
      """.stripMargin 
 
    // 配置创建Actor需要的配置信息 
    val config: Config = ConfigFactory.parseString(configStr) 
 
    // 创建ActorSystem 
    val actorSystem: ActorSystem = ActorSystem(WORKER_SYSTEM, config) 
 
    // 用actorSystem实例创建Actor 
    actorSystem.actorOf(Props(new Worker( 
      host, port, masterHost, masterPort, memory, cores)), WORKER_ACTOR) 
 
    actorSystem.awaitTermination() 
  } 
} 

  

    3.2 测试运行

由于Master 和Worker的运行都是使用main方法参数传入相应的主机名端口等参数,所以在运行前要在IDEA中的Edit Configurations 窗口中传入相应的参数。在本次测试中,我指定的参数如图:

【Master端】

Spark编程实战之模拟SparkRPC原理实现自定义RPC详解大数据

【Worker端】

Spark编程实战之模拟SparkRPC原理实现自定义RPC详解大数据

【运行结果】

1. 先运行Master,可以看到一旦运行Master,就启动了定时器检查超时Worker,因为还没有Worker进行注册,所以结果一直为0

Spark编程实战之模拟SparkRPC原理实现自定义RPC详解大数据

2. 启动Worker

Spark编程实战之模拟SparkRPC原理实现自定义RPC详解大数据

3. 启动Worker后,再看Master的窗口可以发现Worker注册成功,并且数量为1

Spark编程实战之模拟SparkRPC原理实现自定义RPC详解大数据

4. 关闭Worker,此时Worker已经宕掉了,可以发现Master窗口会收到一条警告信息,并且Master在定时检查超时Worker的时候移除了过期未收到心跳的Worker

Spark编程实战之模拟SparkRPC原理实现自定义RPC详解大数据

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9133.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论