这篇文章将为大家详细讲解有关Spark2.x中怎么实现CacheManager源码深度剖析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
一、概述
CacheManager主要发生在利用RDD的数据执行算子的时候,之前我们讲过在ShufffleWriter进行数据写时,会调用RDD对应的Iterator()方法,获取RDD对应的数据,CacheManager主要干三件事:
a. 管理Spark的缓存,可以基于内存,也可以基于磁盘;
b.底层是通过BlockManager进行数据的读写操作;
c.Task运行会调用RDD中的iterator方法进行数据的计算;
二、CacheManager源码剖析
1.之前我们讲解的ShuffleMapTask中的runTask方法时,ShuffleWriter写数据的参数传入的是rdd.iterator()方法计算出来的那个partition数据,代码如下:
var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) //这里就是ShuffleMapTask类的runTask()方法中对应的代码调用 writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { ................... }
2.这里看RDD类中的iterator方法,代码如下:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
//判断下如果StorageLevel.NONE这说明RDD,之前肯定是进行了持久化
//getOrCompute中会通过CacheManager获取之前持久化的数据
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
//如果没有进行过持久化,就需要通过父RDD定义的算子去获取数据
//注意这里如果有CheckPoint,会通过CheckPoint获取,checkPoint获取不到才去重新计算
} else {
computeOrReadCheckpoint(split, context)
}
}
3.跟进去看下持久化的RDD的处理,getOrCompute()函数,代码如下:
private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = { val blockId = RDDBlockId(id, partition.index) var readCachedBlock = true //CacheManger这里是通过BlockManager获取持久化数据, //如果获取成功直接返回,如果获取失败,调用computeOrReadCheckpoint进行计算 //内存数据为啥会丢失? 之前我们知道内存中的数据如果空间不够的话,同时如果指定可以将数据缓存到磁盘,会溢写到磁盘, //如果未指定溢写到磁盘,这些数据就会丢失掉 就需要重新计算 SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => { readCachedBlock = false //获取不到重新计算,这里要注意,代码执行到这里说明这个RDD肯定是经过持久化的 //这里计算出数据后,会在getOrElseUpdate里面通过makeIterator参数对数据进行重新持久化(这里理解的不太透彻) computeOrReadCheckpoint(partition, context) }) match { case Left(blockResult) => if (readCachedBlock) { val existingMetrics = context.taskMetrics().inputMetrics existingMetrics.incBytesRead(blockResult.bytes) new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) { override def next(): T = { existingMetrics.incRecordsRead(1) delegate.next() } } } else { new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) } case Right(iter) => new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]]) } }
4.这里继续跟踪getOrElseUpdate()获取持久化的数据 ,代码如下:
//这里会调用get()方法从本地或者远程获取block数据,直接返回//如果没有读取到数据就需要重新计算数据,由于代码执行到这里,rdd肯定是经过持久化的//这里计算出数据后,通过makeIterator迭代器,重新进行持久化(这里理解的不太透彻) def getOrElseUpdate[T]( blockId: BlockId, level: StorageLevel, classTag: ClassTag[T], makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { // Attempt to read the block from local or remote storage. If it's present, then we don't need // to go through the local-get-or-put path. //这里会调用get()方法从本地或者远程获取block数据,直接返回 get[T](blockId)(classTag) match { case Some(block) => return Left(block) case _ => // Need to compute the block. } //这里的处理意思是:对于本地远程没有获取到数据,然后computeOrReadCheckpoint重新计算的数据 //由于RDD是持久化的,原来的持久化数据可能丢了,这里根据持久化级别重新进行数据的持久化 //这里代码有点不太好理解 要结合上面2中第12-14行代码 一起理解 doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match { case None => // doPut() didn't hand work back to us, so the block already existed or was successfully // stored. Therefore, we now hold a read lock on the block. val blockResult = getLocalValues(blockId).getOrElse { // Since we held a read lock between the doPut() and get() calls, the block should not // have been evicted, so get() not returning the block indicates some internal error. releaseLock(blockId) throw new SparkException(s"get() failed for block $blockId even though we held a lock") } // We already hold a read lock on the block from the doPut() call and getLocalValues() // acquires the lock again, so we need to call releaseLock() here so that the net number // of lock acquisitions is 1 (since the caller will only call release() once). releaseLock(blockId) Left(blockResult) case Some(iter) => // The put failed, likely because the data was too large to fit in memory and could not be // dropped to disk. Therefore, we need to pass the input iterator back to the caller so // that they can decide what to do with the values (e.g. process them without caching). Right(iter) } }
5.这里回过头来看computeOrReadCheckpoint方法,如果计算数据的,代码如下:
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { //如果设置了CheckPoint,从Checkpoint中获取数据 //这里CheckPoint相关的知识,先不讲解,后面有篇文章单独讲解 if (isCheckpointedAndMaterialized) { firstParent[T].iterator(split, context) } else { //如果数据没有进行过Checkpoint,这里只能重新计算一次 //这里就是根据自己的rdd算子重新计算 compute(split, context) } }
6.CacheManager数据计算的大体流程:
1).如果RDD进行过持久化,根据持久化级别通过BlockManager从本地或者远程获取数据,如果数据获取不到,则需要重新计算,由于这里RDD进行过持久化,只是由于某种原因丢失,还需要根据持久化级别重新进行一次数据的持久化。
2).如果RDD没有进行持久化,就需要重新计算,重新计算时,这里如果RDD进行了CheckPoint,则优先获取CheckPoint过的数据,如果没有,则需要从RDD的父RDD执行我们定义的算子来重新计算Partition数据。
关于Spark2.x中怎么实现CacheManager源码深度剖析就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/opensource/223222.html