17.map
/** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }
|
利用映射函数f针对其父RDD的每个元素进行处理,继续往下看MapPartitionsRDD
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false) extends RDD[U](prev) { override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None override def getPartitions: Array[Partition] = firstParent[T].partitions override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context)) }
|
注意关注其compute函数,取得是firstParent[T].iterator,即其第一个父RDD的迭代器,因为RDD是分区的,因此其迭代器需要传入分区索引进去,即split:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) {//如果是需要缓存的,则读缓存或者计算(第一次的时候) SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else {//否则的话重新计算或者读检查点的数据 computeOrReadCheckpoint(split, context) }
}
|
18.mapPartitions
针对RDD的每个分区进行处理,返回一个新的RDD
/**
* Return a new RDD by applying a function to each partition of this RDD.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
|
注意preservesPartitioning参数,如果子RDD需要保留父RDD的分区信息的话,则必须设置为true,否则经过转化之后的RDD的partitioner为None了。
同时需要注意的是:map是对rdd中的每一个元素进行操作,而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。
SparkSql或DataFrame默认会对程序进行mapPartition的优化。
19.mapPartitionsWithIndex
和mapPartitions类似,只是在转化函数中会把分区索引传入进入
/* * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn’t modify the keys. */ def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),//会将分区索引index传入进来 preservesPartitioning)
}
|
如果分区索引对于实际业务有意义的话,可以使用mapPartitionsWithIndex去进行相关转化。
20.flatMap
/** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) }
|
flatMap是针对RDD的每个元素利用函数f生成多个元素,然后把这些结果全部串联起来,其调用的本质就是调用迭代器的flatmap函数:
def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] { private var cur: Iterator[B] = empty def hasNext: Boolean = cur.hasNext || self.hasNext && { cur = f(self.next).toIterator; hasNext } def next(): B = (if (hasNext) cur else empty).next() }
|
21.filter
/** * Return a new RDD containing only the elements that satisfy a predicate. */ def filter(f: T => Boolean): RDD[T] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[T, T]( this, (context, pid, iter) => iter.filter(cleanF), preservesPartitioning = true)
}
|
filter是针对RDD的每个元素利用函数f过滤,返回过滤后的结果:
/** Returns an iterator over all the elements of this iterator that satisfy the predicate `p`. * The order of the elements is preserved. * * @param p the predicate used to test values. * @return an iterator which produces those values of this iterator which satisfy the predicate `p`. * @note Reuse: $consumesAndProducesIterator */ def filter(p: A => Boolean): Iterator[A] = new AbstractIterator[A] { private var hd: A = _ private var hdDefined: Boolean = false
def hasNext: Boolean = hdDefined || {//利用函数p过滤记录,直到找到符合条件的记录为止 do { if (!self.hasNext) return false hd = self.next() } while (!p(hd)) hdDefined = true true }
def next() = if (hasNext) { hdDefined = false; hd } else empty.next()
}
|
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9300.html