本篇文章给大家分享的是有关Spark是如何实现资源管理器比如yarn等可插拔的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
作为Spark源码阅读爱好者,有谁想过Spark是如何实现资源管理器比如yarn等可插拔的呢?
其实,在这里不得不说一下,spark1.6及之前,资源管理器还是不可插拔,代码是写死在sparkContext类里的,你要想增加一种资源管理器,必须要修改SparkContext的代码。
spark2.以后开始可以实现资源管理器的热插拔,主要工具是ServiceLoader。本文就给大家揭示一下。
ServiceLoader与ClassLoader是Java中2个即相互区别又相互联系的加载器.JVM利用ClassLoader将类载入内存,这是一个类声明周期的第一步(一个java类的完整的生命周期会经历加载、连接、初始化、使用、和卸载五个阶段,当然也有在加载或者连接之后没有被初始化就直接被使用的情况)。详情请参阅:详解Java类的生命周期
那ServiceLoader又是什么呢?ServiceLoader:一个简单的服务提供者加载设施。服务 是一个熟知的接口和类(通常为抽象类)集合。服务提供者 是服务的特定实现。提供者中的类通常实现接口,并子类化在服务本身中定义的子类。服务提供者可以以扩展的形式安装在 Java 平台的实现中,也就是将 jar 文件放入任意常用的扩展目录中。也可通过将提供者加入应用程序类路径,或者通过其他某些特定于平台的方式使其可用。……唯一强制要求的是,提供者类必须具有不带参数的构造方法,以便它们可以在加载中被实例化。
通过在资源目录META-INF/services中放置提供者配置文件 来标识服务提供者。文件名称是服务类型的完全限定二进制名称。该文件包含一个具体提供者类的完全限定二进制名称列表,每行一个。忽略各名称周围的空格、制表符和空行。注释字符为'#'('/u0023', NUMBER SIGN);忽略每行第一个注释字符后面的所有字符。文件必须使用 UTF-8 编码。
以延迟方式查找和实例化提供者,也就是说根据需要进行。服务加载器维护到目前为止已经加载的提供者缓存。每次调用 iterator 方法返回一个迭代器,它首先按照实例化顺序生成缓存的所有元素,然后以延迟方式查找和实例化所有剩余的提供者,依次将每个提供者添加到缓存。可以通过 reload 方法清除缓存。
以上来源于Java API里的说明,也许说的很专业,让我们有点晕头转向,我们可以简单的认为:ServiceLoader也像ClassLoader一样,能装载类文件,但是使用时有区别,具体区别如下:
(1) ServiceLoader装载的是一系列有某种共同特征的实现类,而ClassLoader是个万能加载器;
(2)ServiceLoader装载时需要特殊的配置,使用时也与ClassLoader有所区别;
(3)ServiceLoader还实现了Iterator接口。
[如有错误或不到的地方敬请指出,互相学习:)]
链接:https://www.cnblogs.com/sparkbj/articles/6208328.html
首先看一下SparkContext内部初始化管理器的代码
// 创建和启动调度器
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
主要类方法是createTaskScheduler,其中有片段是通过url来找到资源管理器的。
case masterUrl => val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: '" + master + "'") } try { val scheduler = cm.createTaskScheduler(sc, masterUrl) val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException => throw se case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e) }
getClusterManager内部实现了资源管理器的加载。
private def getClusterManager(url: String): Option[ExternalClusterManager] = { val loader = Utils.getContextOrSparkClassLoader val serviceLoaders = ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url)) if (serviceLoaders.size > 1) { throw new SparkException( s"Multiple external cluster managers registered for the url $url: $serviceLoaders") } serviceLoaders.headOption }
然后我们可以找到相关配置了。
以上就是Spark是如何实现资源管理器比如yarn等可插拔的,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/223239.html