Spark算子执行流程详解之六大数据

26.coalesce

coalesce顾名思义为合并,就是把多个分区的RDD合并成少量分区的RDD,这样可以减少任务调度的时间,但是请记住:合并之后不能保证结果RDD中的每个分区的记录数量是均衡的,因为合并的时候并没有考虑合并前每个分区的记录数,合并只会减少RDD的分区个数,因此并不能利用它来解决数据倾斜的问题。

def coalesce(numPartitions: Int, shuffle: Boolean =false)(implicitord:Ordering[T] =null)
    : RDD[T] = withScope {
  if (shuffle) {
    /** Distributes elements evenly across output partitions, starting from a random partition. */
   
val distributePartition = (index: Int, items:Iterator[T]) => {
      var position = (newRandom(index)).nextInt(numPartitions)//针对不同的分区索引初始化一个随机数

//将原来的记录映射为(K,记录)对,其中K为随机数的不断叠加
      items.map { t =>
        // Note that the hash code of the key will just be the key itself. The HashPartitioner
        // will mod it with the number of total partitions.
       
position = position + 1
       
(position, t)
      }
    } : Iterator[(Int, T)]
    // include a shuffle step so that our upstream tasks are still distributed

//针对(k,记录)进行一次Hash分区
   
new CoalescedRDD(
      new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
      new HashPartitioner(numPartitions)),
      numPartitions).values//由于是KV对,最后再取其V即可
  } else {
    new CoalescedRDD(this, numPartitions)
  }

}

先看其shuffle参数,如果为true的话,则先生成一个ShuffleRDD,然后在这基础上产生CoalescedRDD,如果为false的话,则直接生成CoalescedRDD。因此先看下其ShuffleRDD的生成过程:

Spark算子执行流程详解之六大数据

以上是将3个分区合并成2个分区,当shuffle为true的时候,其CoalescedRDD父RDD即ShuffledRDD的生成过程,如果shuffle为false的时候,则直接利用其本身取生成CoalescedRDD。

       再来看CoalescedRDD的计算过程:

private[spark] classCoalescedRDD[T: ClassTag](
    @transient varprev: RDD[T],
    maxPartitions: Int,
    balanceSlack: Double = 0.10)
  extends RDD[T](prev.context,Nil) { // Nil since we implement getDependencies
 
override def getPartitions: Array[Partition] = {
    val pc = newPartitionCoalescer(maxPartitions, prev, balanceSlack)
    pc.run().zipWithIndex.map {
      case (pg, i) =>
        val ids = pg.arr.map(_.index).toArray
        new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
    }
  }
  override def compute(partition: Partition, context: TaskContext):Iterator[T] = {
    partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>
      firstParent[T].iterator(parentPartition, context)
    }
  }
  ……

}

/**
 * Class that captures a coalesced RDD by essentially keeping track of parent partitions
 *
@param index of this coalesced partition
 *
@param rdd which it belongs to

* parentsIndices它代表了当前CoalescedRDD对应分区索引的分区是由父RDD的哪几个分区组成的
 *
@param parentsIndices list of indices in the parent that have been coalesced into this partition

* @param preferredLocation the preferred location for this partition
 */
private[spark] case class CoalescedRDDPartition(
    index: Int,
    @transient rdd: RDD[_],
    parentsIndices: Array[Int],
    @transient preferredLocation: Option[String] = None)extendsPartition {
  var parents:Seq[Partition] =parentsIndices.map(rdd.partitions(_))

  @throws(classOf[IOException])
  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException{
    // Update the reference to parent partition at the time of task serialization
   
parents
= parentsIndices.map(rdd.partitions(_))
    oos.defaultWriteObject()
  }

  /**
   * Computes the fraction of the parents’ partitions containing preferredLocation within
   * their getPreferredLocs.
   *
@return locality of this coalesced partition between 0 and 1
   */
 
def localFraction: Double = {
    val loc = parents.count { p =>
      val parentPreferredLocations = rdd.context.getPreferredLocs(rdd, p.index).map(_.host)
      preferredLocation.exists(parentPreferredLocations.contains)
    }
    if (parents.size ==0)0.0 else(loc.toDouble /parents.size.toDouble)
  }
}

CoalescedRDD的分区结果由CoalescedRDDPartition决定,其中parentsIndices参数代表了CoalescedRDD的某个分区索引的分区来源于其父RDD的哪几个分区,然后就是利用flatMap把父RDD的多个分区串联起来。因此主要关注CoalescedRDD是如何生成CoalescedRDDPartition的,即

override def getPartitions: Array[Partition] = {
  val pc = newPartitionCoalescer(maxPartitions, prev, balanceSlack)

  pc.run().zipWithIndex.map {
    case (pg, i) =>
      val ids = pg.arr.map(_.index).toArray
      new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
  }

}

通过PartitionCoalescer来计算生成CoalescedRDDPartition:

/**
 * Runs the packing algorithm and returns an array of PartitionGroups that if possible are
 * load balanced and grouped by locality
 *
@return array of partition groups
 */
def run(): Array[PartitionGroup] = {

//设置一个个group
  setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
 
 //然后把父rdd的还没有分配的partition放置到一个个group

throwBalls() // assign partitions (balls) to each group (bins)
 
getPartitions

}

首先生成一个个PartitionGroup,里面的arr保存了父rdd的分区索引,然后把其他父rdd没有分配的分区投放至PartitionGroup里面。先看setupGroups的过程,它首先生成targetLen个PartitionGroup,里面包含了初始默认的父rdd的分区索引,其流程如下:

/** 
 * Initializes targetLen partition groups and assigns a preferredLocation 
 * This uses coupon collector to estimate how many preferredLocations it must rotate through 
 * until it has seen most of the preferred locations (2 * n log(n)) 
 * @param targetLen 
 */ 
  def setupGroups(targetLen: Int) { 
  val rotIt = new LocationIterator(prev) 
  // deal with empty case, just create targetLen partition groups with no preferred location
//如果父RDD的分区没有本地性,则直接生成targetLenPartitionGroup返回 
  if (!rotIt.hasNext) { 
    (1 to targetLen).foreach(x => groupArr += PartitionGroup()) 
    return 
  } 
  noLocality = false 
  // number of iterations needed to be certain that we've seen most preferred locations 
  val expectedCoupons2 = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt 
  var numCreated = 0 
  var tries = 0 
  // rotate through until either targetLen unique/distinct preferred locations have been created 
  // OR we've rotated expectedCoupons2, in which case we have likely seen all preferred locations, 
  // i.e. likely targetLen >> number of preferred locations (more buckets than there are machines)
//优先针对每台主机建立其对应的PartitionGroup,目的是为了让之后的计算更加分散 
  while (numCreated < targetLen && tries < expectedCoupons2) { 
    tries += 1 
    // rotIt.next()的返回值为(String, Partition),其中nxt_replica为主机名 nxt_part为分区索引 
    val (nxt_replica, nxt_part) = rotIt.next() 
    if (!groupHash.contains(nxt_replica)) { 
      val pgroup = PartitionGroup(nxt_replica) 
      groupArr += pgroup 
      addPartToPGroup(nxt_part, pgroup)//将其分区索引添加进此PartitionGroup 
        groupHash.put(nxt_replica, ArrayBuffer(pgroup)) // list in case we have multiple 
      numCreated += 1 
    } 
  }
//如果还没有足够多的PartitionGroup,实在不行则针对同一个主机名可以创建多个PartitionGroup 
  while (numCreated < targetLen) {  // if we don't have enough partition groups, create duplicates 
    //(String, Partition) 主机名 分区索引 
    var (nxt_replica, nxt_part) = rotIt.next() 
    val pgroup = PartitionGroup(nxt_replica) 
    groupArr += pgroup 
    //  val groupHash = mutable.Map[String, ArrayBuffer[PartitionGroup]]() (主机名,PartitionGroup(主机名)),同一个主机名可能存在多个PartitionGroup 
    groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup 
    var tries = 0
//将其分区索引添加进此PartitionGroup 
    while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part 
      nxt_part = rotIt.next()._2 
      tries += 1 
    } 
    numCreated += 1 
  }
}

然后将剩余没有分配的父rdd的分区分配至对应的PartitionGroup

def throwBalls() { 
  if (noLocality) {  // no preferredLocations in parent RDD, no randomization needed 没有本地性,少分区合并成多分区,无法合并,保持原样 
      if (maxPartitions > groupArr.size) { // just return prev.partitions 
      for ((p, i) <- prev.partitions.zipWithIndex) { 
        groupArr(i).arr += p 
      } 
    } else { // no locality available, then simply split partitions based on positions in array 
      for (i <- 0 until maxPartitions) {//否则无本地性要求的情况下,简单的按区间进行合并 
          val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt 
        val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt 
        (rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) } 
      } 
    } 
  } else {//遍历父rdd的分区,且之前没有被分配过,则进行分配 
    for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group 
      //选择某个PartitionGroup,然后添加至arr
pickBin(p).arr += p 
    } 
  } 
}

那么pickBin是如何计算的呢?且看:

/** 
 * Takes a parent RDD partition and decides which of the partition groups to put it in 
 * Takes locality into account, but also uses power of 2 choices to load balance 
 * It strikes a balance between the two use the balanceSlack variable 
 * @param p partition (ball to be thrown) 
 * @return partition group (bin to be put in) 
 */ 
  def pickBin(p: Partition): PartitionGroup = {
//获取父rdd的该Partition的本地性所在主机的列表,并按其包含的分区数目从少到多排序 
    val pref = currPrefLocs(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
//如果没有列表,则返回none,如果有,则返回最少的那个主机名的PartitionGroup 
  val prefPart = if (pref == Nil) None else pref.head 
    //随机选择2个PartitionGroup中包含分区数目最小的PartitionGroup 
  val r1 = rnd.nextInt(groupArr.size) 
  val r2 = rnd.nextInt(groupArr.size) 
  val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
  if (prefPart.isEmpty) {//如果无本地性要求,则返回minPowerOfTwo 
    // if no preferred locations, just use basic power of two 
    return minPowerOfTwo 
  } 
   
  val prefPartActual = prefPart.get 
  //否则根据平衡因子来选择 
  if (minPowerOfTwo.size + slack <= prefPartActual.size) { // more imbalance than the slack allows 
    minPowerOfTwo  // prefer balance over locality 
  } else { 
    prefPartActual // prefer locality over balance 
  }
}
因此合并的原则就是:
1.保证CoalescedRDD的每个分区个数相同
2.CoalescedRDD的每个分区,尽量跟它的Parent RDD的本地性相同。比如说CoalescedRDD的分区1对应于它的Parent RDD的1到10这10个分区,但是1到7这7个分区在节点1.1.1.1上,那么 CoalescedRDD的分区1所要执行的节点就是1.1.1.1。这么做的目的是为了减少节点间的数据通信,提升处理能力。
3.CoalescedRDD的分区尽量分配到不同的节点执行
比如说:
1)3个分区合并成2个分区,shuffle为true


ShuffleRDD的getPreferredLocations为Nil

2)2个分区合并成3个分区,shuffle为true


ShuffleRDD的getPreferredLocations为Nil
3)5个分区合并成3个分区,shuffle为 false,父RDD的每个分区都包含本地性


4)5个分区合并成3个分区,shuffle为 false,父RDD的每个分区不包含本地性
 

5)3个分区合并成5个分区,shuffle为 false,父RDD的每个分区都包含本地性

 
6)3个分区合并成5个分区,shuffle为 false,父RDD的每个分区不包含本地性

 

27.repartition

对RDD重分区,重分区之后的分区个数为numPartitions。
/** 
 * Return a new RDD that has exactly numPartitions partitions. 
 * 
 * Can increase or decrease the level of parallelism in this RDD. Internally, this uses 
 * a shuffle to redistribute data. 
 * 
 * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 
 * which can avoid performing a shuffle. 
 */ 
  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { 
  coalesce(numPartitions, shuffle = true) 
}
可见其本质调用的还是coalesce,但是其shuffle参数为true,因为如果为false,则有可能获取不到指定分区个数的rdd

28.sample

Sample是对rdd中的数据集进行采样,并生成一个新的RDD,这个新的RDD只有原来RDD的部分数据,这个保留的数据集大小由fraction来进行控制. 
/**  * Return a sampled subset of this RDD.  *  * @param withReplacement can elements be sampled multiple times (replaced when sampled out)  * @param fraction expected size of the sample as a fraction of this RDD's size  *  without replacement: probability that each element is chosen; fraction must be [0, 1]  *  with replacement: expected number of times each element is chosen; fraction must be >= 0  * @param seed seed for the random number generator  */   def sample(     withReplacement: Boolean,     fraction: Double,     seed: Long = Utils.random.nextLong): RDD[T] = withScope {   require(fraction >= 0.0, "Negative fraction value: " + fraction)   if (withReplacement) {     new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)   } else {     new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)   } }
withReplacement:这个值如果是true时,采用PoissonSampler抽样器(Poisson分布),否则使用BernoulliSampler的抽样器.
Fraction:一个大于0,小于或等于1的小数值,用于控制要读取的数据所占整个数据集的概率.
Seed:这个值如果没有传入,默认值是一个0~Long.maxvalue之间的整数.
至于那个PoissonSampler抽样器和BernoulliSampler抽样器,数学理论比较深,大家感兴趣可以百度相关资料查看其抽样原理,这里不详细叙述其抽样的内部原理。
继续看PartitionwiseSampledRDD: 
private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](     prev: RDD[T],     sampler: RandomSampler[T, U],     @transient preservesPartitioning: Boolean,     @transient seed: Long = Utils.random.nextLong)   extends RDD[U](prev) {   @transient override val partitioner = if (preservesPartitioning) prev.partitioner else None   override def getPartitions: Array[Partition] = {     val random = new Random(seed)     firstParent[T].partitions.map(x => new PartitionwiseSampledRDDPartition(x, random.nextLong()))   }   override def getPreferredLocations(split: Partition): Seq[String] =     firstParent[T].preferredLocations(split.asInstanceOf[PartitionwiseSampledRDDPartition].prev)   override def compute(splitIn: Partition, context: TaskContext): Iterator[U] = {     val split = splitIn.asInstanceOf[PartitionwiseSampledRDDPartition]     val thisSampler = sampler.clone     thisSampler.setSeed(split.seed)
//调用不同的抽样器针对每个分区进行抽样     thisSampler.sample(firstParent[T].iterator(split.prev, context))   } }
 

29.takeSample

takeSample函数返回一个数组,在数据集中随机采样 num 个元素组成。
/**  * Return a fixed-size sampled subset of this RDD in an array  *  * @param withReplacement whether sampling is done with replacement  * @param num size of the returned sample  * @param seed seed for the random number generator  * @return sample of specified size in an array  */ // TODO: rewrite this without return statements so we can wrap it in a scope   def takeSample(     withReplacement: Boolean,     num: Int,     seed: Long = Utils.random.nextLong): Array[T] = {   val numStDev = 10.0      if (num < 0) {//num为负数,直接抛异常     throw new IllegalArgumentException("Negative number of elements requested")   } else if (num == 0) {//num为0的话,直接返回空数组     return new Array[T](0)   }      val initialCount = this.count()   if (initialCount == 0) {//如果rdd为空的话,那么就直接返回空数组       return new Array[T](0)   }   //计算最大允许的采样个数   val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt   if (num > maxSampleSize) {
throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " +       s"$numStDev * math.sqrt(Int.MaxValue)")   }      val rand = new Random(seed)
//如果不支持重复采样,且采样总数大于rdd的个数,则直接把rdd的数据集混洗完返回   if (!withReplacement && num >= initialCount) {     return Utils.randomizeInPlace(this.collect(), rand)   }   //为了利用sample方法,需要计算采样百分比   val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,     withReplacement)   //尝试进行第一次采样   var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()      // If the first sample didn't turn out large enough, keep trying to take samples;   // this shouldn't happen often because we use a big multiplier for the initial size   var numIters = 0
//如果采样返回的个数不满足条件,则继续利用sample进行采样   while (samples.length < num) {     logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")     samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()     numIters += 1   }   //将结果集混洗完取前num条数据   Utils.randomizeInPlace(samples, rand).take(num)
}
 

30.randomSplit

依据所提供的权重对该RDD进行随机划分
def randomSplit(     weights: Array[Double],     seed: Long = Utils.random.nextLong): Array[RDD[T]] = withScope {   val sum = weights.sum   val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)   normalizedCumWeights.sliding(2).map { x =>     randomSampleWithRange(x(0), x(1), seed)   }.toArray }      /**  * Internal method exposed for Random Splits in DataFrames. Samples an RDD given a probability  * range.  * @param lb lower bound to use for the Bernoulli sampler  * @param ub upper bound to use for the Bernoulli sampler  * @param seed the seed for the Random number generator  * @return A random sub-sample of the RDD without replacement.  */   private[spark] def randomSampleWithRange(lb: Double, ub: Double, seed: Long): RDD[T] = {   this.mapPartitionsWithIndex( { (index, partition) =>     val sampler = new BernoulliCellSampler[T](lb, ub)     sampler.setSeed(seed + index)     sampler.sample(partition)   }, preservesPartitioning = true)
}
假设按照以下进行采样:

List<Integer> data = Arrays.asList(1,2,4,3,5,6,7);

JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);

double [] weights =  {1,2,3,4};

//依据所提供的权重对该RDD进行随机划分

JavaRDD<Integer> [] randomSplitRDDs = javaRDD.randomSplit(weights);
先进行权重的计算,即normalizedCumWeights=[0.0,0.1,0.3,0.6,1.0],然后调用normalizedCumWeights.sliding(2)将其两两分组,即转化为[0.0,0.1],[0.1,0.3],[0.3,0.6],[0.6,0.1],接着利用伯努利采样器进行采样,即: 
class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = false) 
  extends RandomSampler[T, T] { 
  /** epsilon slop to avoid failure from floating point jitter. */ 
  require( 
    lb <= (ub + RandomSampler.roundingEpsilon), 
    s"Lower bound ($lb) must be <= upper bound ($ub)") 
  require( 
    lb >= (0.0 - RandomSampler.roundingEpsilon), 
    s"Lower bound ($lb) must be >= 0.0") 
  require( 
    ub <= (1.0 + RandomSampler.roundingEpsilon), 
    s"Upper bound ($ub) must be <= 1.0") 
  private val rng: Random = new XORShiftRandom 
  override def setSeed(seed: Long): Unit = rng.setSeed(seed)
//利用sample进行采样 
  override def sample(items: Iterator[T]): Iterator[T] = { 
    if (ub - lb <= 0.0) { 
      if (complement) items else Iterator.empty 
    } else { 
      if (complement) { 
        items.filter { item => { 
          val x = rng.nextDouble() 
          (x < lb) || (x >= ub) 
        }} 
      } else {//正常走这个分之,其complement为false
//其实就是利用rng生成随机数,然后判断其范围,是否在[lb,ub)区间范围之内,是的话就保留,否则就抛弃 
        items.filter { item => { 
          val x = rng.nextDouble() 
          (x >= lb) && (x < ub) 
        }} 
      } 
    } 
  } 
   
  /** 
   *  Return a sampler that is the complement of the range specified of the current sampler. 
   */ 
  def cloneComplement(): BernoulliCellSampler[T] = 
    new BernoulliCellSampler[T](lb, ub, !complement) 
   
  override def clone: BernoulliCellSampler[T] = new BernoulliCellSampler[T](lb, ub, complement)
}
则以上采样的结果可能为,权重大的获取到的数据量就大,但相互之间不一定成比例,只代表一种概率
scala> randomSplitRDDs (0).collect
res10: Array[Int] = Array(1, 4)
scala> randomSplitRDDs (1).collect
res11: Array[Int] = Array(3)                                                    
scala> randomSplitRDDs (2).collect
res12: Array[Int] = Array(5, 9)
scala> randomSplitRDDs (3).collect
res13: Array[Int] = Array(2, 6, 7, 8, 10)
 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/9298.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论