Spark 是分布式计算框架,多台机器之间必然存在着通信。Spark在早期版本采用Akka实现。现在在Akka的上层抽象出了一个RpcEnv。RpcEnv负责管理机器之间的通信。
RpcEnv包含了如下三大核心:
-
RpcEndpoint 消息循环体,负责接收并处理消息。Spark中的Master、Worker都是RpcEndpoint 。
-
RpcEndpointRef :RpcEndpoint的引用,如果需要和RpcEndpoint通信,就必须获取它的RpcEndpointRef,通过RpcEndpointRef发送消息。
-
Dispatcher:消息调度器,负责RPC消息路由到适当的RpcEndpoint。
RpcEnv被创建以后,RpcEndpoint可以注册到RpcEnv中,被注册的RpcEndpoint会生成一个相应的RpcEndpointRef来引用它。如果你需要向RpcEndpoint发送消息,必须到RpcEnv中通过RpcEndpoint的名称来获取对应的RpcEndpointRef,然后通过RpcEndpointRef向RpcEndpoint发送消息。
RpcEnv负责管理RpcEndpoint的整个生命周期
-
注册RpcEndpoint,使用name或者uri
-
路由发送给RpcEndpoint的消息。
-
停止RpcEndpoint
注:一个RpcEndpoint只能注册给一个RpcEnv
RpcAddress:RpcEnv的逻辑地址,使用主机名和端口表示。
RpcEndpointAddress:注册到RpcEnv上的RpcEndpoint的地址,由RpcAddress和name构成。
由此可见RpcEnv和RpcEndpoint是在相同的机器上(相同的JVM中)。而要想给远端机器发送消息,是获取远端机器的RpcEndpointRef,而并不是远端的RpcEndpoint注册到本地的RpcEnv中。
在Spark1.6版本中,默认使用的是netty
private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = { val rpcEnvNames = Map( "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory", "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory") val rpcEnvName = conf.get("spark.rpc", "netty") val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName) Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory] }
RpcEndpoint是一个消息循环体,它的生命周期:
构造(Constructor)->启动(onStart)->消息接收(receive&receiveAndReply)->停止(onStop)
receive():不断的运行,处理客户端发送过来的消息。
receiveAndReply():处理消息,并且回应对方。
我们看一下Master的代码:
def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) //指定的主机名必须是start-master.sh脚本运行的本地机器名称 val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf) rpcEnv.awaitTermination() } /** * Start the Master and return a three tuple of: * (1) The Master RpcEnv * (2) The web UI bound port * (3) The REST server bound port, if any */ def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, conf: SparkConf): (RpcEnv, Int, Option[Int]) = { val securityMgr = new SecurityManager(conf) //创建Rpc环境,主机名和端口就是Standalone集群的访问地址。SYSTEM_NAME=sparkMaster val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr) // 将Master实例注册到RpcEnv中 val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest) (rpcEnv, portsResponse.webUIPort, portsResponse.restPort) }
在main方法中创建了RpcEnv,并且实例化Master实例,然后注册到RpcEnv中。
RpcEndpoint其实是注册到Dispatcher中的,在netty中的代码实现如下:
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { dispatcher.registerRpcEndpoint(name, endpoint) }
注:NettyRpcEnv.scala的第135行
而Dispatcher中使用如下数据结构来存储RpcEndpoint和RpcEndpointRef
private val endpoints = new ConcurrentHashMap[String, EndpointData] private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
EndpointData为一个case class:
private class EndpointData( val name: String, val endpoint: RpcEndpoint, val ref: NettyRpcEndpointRef) { val inbox = new Inbox(ref, endpoint) }
在Master中使用数据结构WorkerInfo保存着每个Worker的信息,其中就包括每个Worker的RpcEndpointRef
备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/193659.html