大数据计算框架Spark的任务调度是怎么实现的

大数据计算框架Spark的任务调度是怎么实现的,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

Spark有几种资源调度设施。每个Spark Application(SparkContext实例)独立地运行在一组executor进程内。cluster  manager为应用间的调度提供设施。在每个Spark应用内,如果将多个job(多个spark action)提交给不同的线程,那么他们会并行运行。

1 Application间的资源调度

集群上,每个Spark application获得独立的一组executor JVM,这组executor  JVM只为那个application运行task和存储数据。如果多个用户要共享集群,有不同的策略管理资源分配,这取决于使用的cluster  manager。

资源的静态分区(static partitioning)可被所有的cluster  manager获得,这样每个application在他的生命周期内都可获得他能使用的最多资源。standalone、YARN、coarse-grained  Mesos mode这三种模式使用的就是这种方式。

1.1控制资源使用

集群类型下,如下配置资源分配:

  1. Standalone mode:application提交到standalone  mode集群,将会以FIFO的顺序运行,每个application会尽可能地使用所有可用节点,配置spark.cores.max来限制application使用节点的数目,或者设置spark.deploy.defaultCores。除了可以设置application可用内核数,还可以设置spark.executor.memory来控制内存的使用。

  2. Mesos:为了使用静态分区(static  partitioning)在Mesos集群上,spark.mesos.coarse=true,可以通过设置spark.cores.max来限制每个application的资源共享,通过设置spark.executor.memory来控制executor内存的使用。

  3. YARN:通过设置–num-executors选项,spark  YARN客户端可控制集群上有多少executor被分配(对应的配置属性为spark.executor.instances),–executor-memory(对应的配置属性spark.executor.memory)和–executor-cores(对应的配置属性spark.executor.cores)控制了分配给每个executor的资源。

应用之间无法共享内存。

1.2动态资源分配

Spark提供了依据应用的工作量动态调整资源的机制。这意味着你的application不在使用的资源会返还给集群,当需要的时候再申请分配资源,这种特性对于多应用共享集群特别有用。

这个特性默认失效,但在所有coarse-grained cluster manager上都可用,如:standalone mode, YARN mode,  和Mesos coarse-grained mode。

使用这个特性有两个要求。首先用于必须设置spark.dynamicAllocation.enabled=true,其次要设置external  shuffle service在集群上的每个worker  node并设置spark.shuffle.service.enabled=true。设置external shuffle  service目的是executor可被移除但是不删除他们生成的shuffle文件。

设置这个变量的方式为:

  • 在standalone模式:设置spark.shuffle.service.enabled=true

  • Mesos  coarse-grained模式:在所有从节点运行$SPARK_HOME/sbin/start-mesos-shuffle-service.sh设置spark.shuffle.service.enabled=true

  • YARN:详见运行spark与YARN

1.3资源分配策略

当Spark不再使用executor时就出让它,需要的时候再获取它。因为没有一个确定的方式预测将要被移除的executor是否在不久的将来会被使用,或者一个将要被添加的新executor实际上是否是空闲的,所以我们需要一系列试探来确定是移除executor(可能会移除多个)还是请求executor(可能会请求多个)。

请求策略

开启Spark application动态分配资源特性,当pending task等待被调度时,Spark  application会请求额外的executor。这就意味着,当前的这些executor无法同时满足所有的task,这些task已经被提交,但是还没有执行完。

Spark轮流请求executor。当task等待的时间大于spark.dynamicAllocation.schedulerBacklogTimeout时,真正的请求(申请executor的请求)被触发,之后,如果未完成task队列存在,那么每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout秒请求被触发一次。每一轮请求的executor数量以指数级增长。例如,***轮请求一个executor,第二轮请求2个,第三,四轮分别请求4,8个。

按指数形式增长的动机有两个,首先,起初应用应该慎重地请求executor,以防只需几个executor就能满足需求,这和TCP慢启动类似。其次,当应用确实需要更多的executor时,应用应该能够及时地增加资源的使用。

移除策略

当executor闲置超过spark.dynamicAllocation.executorIdleTimeout秒时,就将他移除。注意,大多数情况下,executor的移除条件和请求条件是互斥的,这样如果仍然有待调度的task的情况下executor是不会被移除的。

executor优雅地退役

非动态分配资源情况下,一个Spark  executor或者是由于失败而退出,或者是因相关application退出而退出。这两种情况下,不在需要与executor相关联的状态并且这些状态可以被安全地丢弃。动态分配资源的情况下,当executor被明确移除时,application仍然在运行。如果application要想使用这些由executor存储和写下的状态,就必须重新计算状态。这样就需要一种优雅的退役机制,即在executor退役前保留他的状态。

这个机制对于shuffles特别重要。shuffle期间,executor自己的map输出写入本地磁盘。当其他的executor要获取这些文件的时候,这个executor充当了文件服务器的角色。对于那些落后的executor,他们的task执行时间比同辈要长,在shuffle完成之前,动态资源分配可能移除了一个executor,这种情形下,那个executor写入本地的文件(即executor的状态)不必重新计算。

保留shuffle文件的办法就是使用外部的shuffle服务,这是在Spark  1.2中引入的。这个外部的shuffle服务指的是长时间运行的进程,它运行与集群的每个节点上,独立于application和executor。如果这个服务可用,executor就从这个服务获shuffle  file,而不是彼此之间获取shuffle  file。这意味着executor生成的任何shuffle文件都可能被服务包含,即使在executor生命周期之外也是如此。

executor除了写shuffle  文件到本地硬盘,还缓存数据到硬盘或内存中。但是,当executor被移除后,缓存到内存中的数据将不可用。为了解决这一问题,默认地缓存数据到内存的executor永远不会被删除。可以通过spark.dynamicAllocation.cachedExecutorIdleTimeout配置这一行为,

2 Application内的资源调度

概述

给定的application内部(SparkContext  实例),如果多个并行的job被提交到不同的线程上,那么这些job可以同时执行。这里的job指的是Spark action及Spark  action触发的计算task。Spark scheduler是线程安全的,支持spark application服务于多个请求。

默认地Spark  scheduler以FIFO的顺序执行job,每个job被切分为一到多个stage(例如,map和reduce),当***个job的stage的task启动后,这个job优先获得所有可用资源,然后才是第二,三个job……。如果队头的job不必使用整个集群,之后的job就能立即启动。如果队头的job较大,那么之后的job启动延迟会比较明显。

从Spark  0.8开始,也可以通过配置实现队列间的公平调度。Job间的task资源分配采用单循环的方式。所有job都会获得大致相同的集群资源。这就意味着,当有长job存在时,提交的短job可以立即获得资源启动运行而不必等到长job执行完毕。可以设置spark.scheduler.mode为FAIR

val conf = new SparkConf().setMaster(...).setAppName(...)  conf.set("spark.scheduler.mode", "FAIR")  val sc = new SparkContext(conf)

公平调度池(可能多个)

公平调度器也支持在池中对job分组并给每个池配置不同的选项。这有助于为更重要的job设置高优先级池,例如把每个用户的job分到一组,并且给这些用户相等的资源不论有多少并行task,而不是给每个job相等的资源。

不需要任何干预,新job会进入默认池,但是可以使用spark.scheduler.pool设置job池。

sc.setLocalProperty("spark.scheduler.pool", "pool1")

设置完后,这个线程(通过调用RDD.save, count,  collect)提交的所有job都会使用这个资源池的名称。设置是针对每一个线程的,这样更容易实现一个线程运行一个用户的多个job。如果想清除与一个线程相关的池,调用:sc.setLocalProperty("spark.scheduler.pool",  null)

池默认行为

默认地每个池都能获得相等的资源(在默认池中每个job都能获得相等的资源),但在每个池内部,job以FIFO  的顺序运行。例如如果为每一个用户创建一个池,这就意味着每一个用户将获得相等的资源,并且每个用户的查询都会按顺序运行而不会出现后来的查询抢占了前面查询的资源

配置池属性

可以通过修改配置文件改变池属性。每个池都支持三种属性:

  • schedulingMode:可以是FIFO或FAIR,控制池中的job排队等候或公平地分享集群资源。

  • weight:控制资源分配的比例。默认所有池分配资源比重都是1。如果指定一个池的比重为2,那么他获得的资源是其他池的2倍。如果将一个池的比重设的很高,比如1000,那么不论他是否有活跃的job,他总是***个开始执行task。

  • minShare:除了设置总体的占比之外,还可以对每个池设定一个最小资源分配(例如CPU核数)。在根据比重重新分配资源之前,公平调度器总是试图满足所有活跃池的最小资源需求。minShare属性能以另一种方式确保一个池快速地获得一定数量的资源(10个核)而不必给他更高的优先级。默认地minShare=0。

调用SparkConf.set,可以通过XML文件配置池属性:

conf.set("spark.scheduler.allocation.file", "/path/to/file")

每个池一个,在XML文件中没有配置的池使用默认配置(调度模式 FIFO, weight 1, minShare 0),例如:

<?xml version="1.0"?><allocations>  <pool name="production">  <schedulingMode>FAIR</schedulingMode>  <weight>1</weight>  <minShare>2</minShare>  </pool>  <pool name="test">  <schedulingMode>FIFO</schedulingMode>  <weight>2</weight>  <minShare>3</minShare>  </pool></allocations>

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。

原创文章,作者:3628473679,如若转载,请注明出处:https://blog.ytso.com/220436.html

(0)
上一篇 2022年1月2日
下一篇 2022年1月2日

相关推荐

发表回复

登录后才能评论