1.take
获取前num条记录。
def take(num: Int): Array[T] = withScope { val left = num – buf.size res.foreach(buf ++= _.take(num – buf.size)) buf.toArray |
首先关注下sc.runJob函数的传参:
/**
|
其中partitions: Seq[Int]代表需要计算的分区,可以计算某个分区,也可以计算多个分区,是待计算的分区集合。
其次先看第一次循环,其partsScanned为0,numPartsToTry为1,因此先计算第一个分区的结果,如果第一次计算可以取得满足条件的num个值,则循环结束,如果取不到满足条件的num个值,则扩大第二次计算的分区范围,很可能一下子扫多个分区。
其执行过程见下图:
Take可以避免全量计算,执行时间比较短。但可能会多次触发action。
2.first
取RDD的第一个元素
/** } |
其实就是调用take来完成的,take的流程可以查阅take函数详解
3.sortByKey
def sortByKey(ascending: Boolean =true, numPartitions: Int = self.partitions.length) |
sortByKey其实就是根据父RDD生成ShuffledRDD的过程,其分区函数为范围分区RangePartitioner,执行过程如下:
父RDD的每个分区按照分区函数RangePartitioner将每个分区的数据划分为多个分区的数据,然后ShuffledRDD拉取自己对应分区的数据。但是sortByKey主要应该掌握其RangePartitioner分区函数的执行原理,它如何保证ShuffledRDD的每个分区的数量是大致相同的,也就是如何来划分每个分区的边界的,且看:
class RangePartitioner[K: Ordering : ClassTag, V]( // We allow partitions = 0, which happens when sorting an empty RDD under the default settings. private var ordering= implicitly[Ordering[K]] // An array of upper bounds for the first (partitions – 1) partitions前(partitions – 1)的分区边界 def numPartitions: Int = rangeBounds.length +1 private var binarySearch: ((Array[K],K) => Int) = CollectionsUtils.makeBinarySearch[K] def getPartition(key: Any): Int = { /** /** } |
可见其分区边界由rangeBounds保存,然后提供getPartition函数,根据传入的key获取其对于的分区编号。那么现在的问题就是当不知道父RDD的每个分区总数的情况下,如何保证数据被随机抽样出来,只有数据随机被抽样出来,才能保证之后切分分区的时候每个分区的数目是大致相同的。(这样就可以只扫描一次获取随机值,否则需要先扫描出总数,然后根据总数来抽样,这样就扫描了2次)
首先抛出个数学算法,即 Reservoir Sampling(水塘抽样),目的在于从包含n个项目的集合S中选取k个样本,其中n为一很大或未知的数量,尤其适用于不能把所有n个项目都存放到主内存的情况。
具体推导过程这里不详细描述,直接写结论:
在取第n个数据的时候,我们生成一个0到1的随机数p,如果p小于k/n,替换池中任意一个为第n个数。大于k/n,继续保留前面的数。直到数据流结束,返回此k个数。但是为了保证计算机计算分数额准确性,一般是生成一个0到n的随机数。
//stream代表数据流 //reservoir代表返回长度为k的池塘 //从stream中取前k个放入reservoir; for ( int i = 1; i < k; i++) reservoir[i] = stream[i]; for (i = k; stream != null; i++) { p = random(0, i); if (p < k) reservoir[p] = stream[i]; return reservoir; |
接下来看其具体的执行过程:
private var rangeBounds: Array[K] = { // sampleSizePerPartition代表的是每个分区抽样的值,然后针对待排序的key值进行抽样,即sketch函数 //如果存在数据倾斜的情况,则某些分区包含数据量多的情况下,抽样的值偏少,需要增加抽样的数目 //针对不平衡的分区继续抽样 //根据各个分区抽样的值来划分边界,其中weight值反应某个key的权重,权重越大,说明该key值越多 |
其中sketch函数:
def sketch[K: ClassTag](
def reservoirSampleAndCount[T: ClassTag]( //先取前K个值 vari = 0 // If we have consumed all the elements, return them. Otherwise do the replacement. |
最后调用determineBounds来确定分界值:
def determineBounds[K: Ordering : ClassTag]( } |
其划分边界的具体流程如下:
因此执行RDD的sortbykey操作,会导致对其key的至少一次扫描,比较耗时间,对外表现就是会执行一次action操作。
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/9303.html