Spark算子执行流程详解之四大数据

17.map

/**
 * Return a new RDD by applying a function to all elements of this RDD.
 */
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

利用映射函数f针对其父RDD的每个元素进行处理,继续往下看MapPartitionsRDD

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
   
preservesPartitioning: Boolean = false)
  extends RDD[U](prev) {
  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
  override def getPartitions: Array[Partition] = firstParent[T].partitions
  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))
}

注意关注其compute函数,取得是firstParent[T].iterator,即其第一个父RDD的迭代器,因为RDD是分区的,因此其迭代器需要传入分区索引进去,即split:

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {//如果是需要缓存的,则读缓存或者计算(第一次的时候)
    SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  } else {//否则的话重新计算或者读检查点的数据
    computeOrReadCheckpoint(split, context)
  }

}

 

18.mapPartitions

针对RDD的每个分区进行处理,返回一个新的RDD
/** 
 * Return a new RDD by applying a function to each partition of this RDD. 
 * 
 * `preservesPartitioning` indicates whether the input function preserves the partitioner, which 
 * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. 
 */ 
  def mapPartitions[U: ClassTag]( 
    f: Iterator[T] => Iterator[U], 
    preservesPartitioning: Boolean = false): RDD[U] = withScope { 
  val cleanedF = sc.clean(f) 
  new MapPartitionsRDD( 
    this, 
    (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter), 
    preservesPartitioning)
}

注意preservesPartitioning参数,如果子RDD需要保留父RDD的分区信息的话,则必须设置为true,否则经过转化之后的RDD的partitioner为None了。

同时需要注意的是:map是对rdd中的每一个元素进行操作,而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。

SparkSql或DataFrame默认会对程序进行mapPartition的优化。

19.mapPartitionsWithIndex

和mapPartitions类似,只是在转化函数中会把分区索引传入进入

/*
 * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
 * of the original partition.
 *
 * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
 * should be `false` unless this is a pair RDD and the input function doesn’t modify the keys.
 */
def mapPartitionsWithIndex[U: ClassTag](
    f: (Int, Iterator[T]) => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U] = withScope {
  val cleanedF = sc.clean(f)
  new MapPartitionsRDD(
    this,
    (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),//会将分区索引index传入进来
    preservesPartitioning)

}

如果分区索引对于实际业务有意义的话,可以使用mapPartitionsWithIndex去进行相关转化。

20.flatMap

/**
 *  Return a new RDD by first applying a function to all elements of this
 *  RDD, and then flattening the results.
 */
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

flatMap是针对RDD的每个元素利用函数f生成多个元素,然后把这些结果全部串联起来,其调用的本质就是调用迭代器的flatmap函数:

def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
  private var cur: Iterator[B] = empty
 
def hasNext: Boolean =
    cur.hasNext || self.hasNext && { cur = f(self.next).toIterator; hasNext }
  def next(): B = (if (hasNext) cur else empty).next()
}

 

21.filter

/**
 * Return a new RDD containing only the elements that satisfy a predicate.
 */
def filter(f: T => Boolean): RDD[T] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[T, T](
    this,
    (context, pid, iter) => iter.filter(cleanF),
    preservesPartitioning = true)

}

filter是针对RDD的每个元素利用函数f过滤,返回过滤后的结果:

/** Returns an iterator over all the elements of this iterator that satisfy the predicate `p`.
 *  The order of the elements is preserved.
 *
 * 
@param p the predicate used to test values.
 * 
@return  an iterator which produces those values of this iterator which satisfy the predicate `p`.
 * 
@note    Reuse: $consumesAndProducesIterator
 */
def filter(p: A => Boolean): Iterator[A] = new AbstractIterator[A] {
  private var hd: A = _
  private var hdDefined: Boolean = false

  def hasNext: Boolean = hdDefined || {//利用函数p过滤记录,直到找到符合条件的记录为止
    do {
      if (!self.hasNext) return false
     
hd = self.next()
    } while (!p(hd))
    hdDefined = true
    true
 
}

  def next() = if (hasNext) { hdDefined = false; hd } else empty.next()

}

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9300.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论