31.union
将2个rdd合并在一起。
def union(other: RDD[T]): RDD[T] = withScope { if (partitioner.isDefined && other.partitioner == partitioner) {//两者的分区函数相同 new PartitionerAwareUnionRDD(sc, Array(this, other)) } else {//两者的分区函数不同 new UnionRDD(sc, Array(this, other)) } } |
先来看当两者的分区函数相同时其是如何处理的:
private[spark] class PartitionerAwareUnionRDD[T: ClassTag]( sc: SparkContext, var rdds: Seq[RDD[T]] ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) { require(rdds.length > 0) require(rdds.forall(_.partitioner.isDefined)) require(rdds.flatMap(_.partitioner).toSet.size == 1, "Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner)) override val partitioner = rdds.head.partitioner //生成PartitionerAwareUnionRDDPartition,保存了组成某个分区索引为index的分区来源于rdds的哪几个分区 override def getPartitions: Array[Partition] = { val numPartitions = partitioner.get.numPartitions (0 until numPartitions).map(index => { new PartitionerAwareUnionRDDPartition(rdds, index) }).toArray } // Get the location where most of the partitions of parent RDDs are located override def getPreferredLocations(s: Partition): Seq[String] = { logDebug("Finding preferred location for " + this + ", partition " + s.index) val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents val locations = rdds.zip(parentPartitions).flatMap { case (rdd, part) => { val parentLocations = currPrefLocs(rdd, part) logDebug("Location of " + rdd + " partition " + part.index + " = " + parentLocations) parentLocations } } val location = if (locations.isEmpty) { None } else { // Find the location that maximum number of parent partitions prefer Some(locations.groupBy(x => x).maxBy(_._2.length)._1) } logDebug("Selected location for " + this + ", partition " + s.index + " = " + location) location.toSeq } override def compute(s: Partition, context: TaskContext): Iterator[T] = { //parents即指向了该分区来源于的rdds组合的哪几个分区 val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents //然后就是遍历原始rdds组合的某几个分区组成单个分区 rdds.zip(parentPartitions).iterator.flatMap { case (rdd, p) => rdd.iterator(p, context) } } override def clearDependencies() { super.clearDependencies() rdds = null } // Get the *current* preferred locations from the DAGScheduler (as opposed to the static ones) private def currPrefLocs(rdd: RDD[_], part: Partition): Seq[String] = { rdd.context.getPreferredLocs(rdd, part.index).map(tl => tl.host) } } |
其分区信息为PartitionerAwareUnionRDDPartition:
class PartitionerAwareUnionRDDPartition( @transient val rdds: Seq[RDD[_]], val idx: Int ) extends Partition { // parents保存了对于分区索引idx来源于rdds的Partition信息,其实就是一一对应,比方说第1个分区来源于rdds组合中的每个rdd的第一个分区 var parents = rdds.map(_.partitions(idx)).toArray override val index = idx override def hashCode(): Int = idx @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { // Update the reference to parent partition at the time of task serialization parents = rdds.map(_.partitions(index)).toArray oos.defaultWriteObject() } } |
其实当分区函数相同时,其结果的RDD的对应分区来源于原始两个RDD的对应分区,即:
再来看当两者的分区函数不相同时其是如何处理的:
class UnionRDD[T: ClassTag]( sc: SparkContext, var rdds: Seq[RDD[T]]) extends RDD[T](sc, Nil) { // Nil since we implement getDependencies override def getPartitions: Array[Partition] = { //计算rdds组合总共有几个分区 val array = new Array[Partition](rdds.map(_.partitions.length).sum) var pos = 0 //总共有几个分区就生成几个分区,其每个分区各自对应rdds组合中的分区 for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) { // pos:分区索引,rdd:该分区的父rdd,rddIndex:父rdd在rdds中的索引,split.index:该分区的Partition信息 array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index) pos += 1 } array } override def getDependencies: Seq[Dependency[_]] = { val deps = new ArrayBuffer[Dependency[_]] var pos = 0 for (rdd <- rdds) { deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length) pos += rdd.partitions.length } deps } override def compute(s: Partition, context: TaskContext): Iterator[T] = { val part = s.asInstanceOf[UnionPartition[T]] //rdds组合中的某个rdd所对应的分区数据 parent[T](part.parentRddIndex).iterator(part.parentPartition, context) } override def getPreferredLocations(s: Partition): Seq[String] = s.asInstanceOf[UnionPartition[T]].preferredLocations() override def clearDependencies() { super.clearDependencies() rdds = null } } |
且看UnionPartition中parentPartition代表的意思:
/** * Partition for UnionRDD. * * @param idx index of the partition * @param rdd the parent RDD this partition refers to * @param parentRddIndex index of the parent RDD this partition refers to * @param parentRddPartitionIndex index of the partition within the parent RDD * this partition refers to */ private[spark] class UnionPartition[T: ClassTag]( idx: Int, @transient rdd: RDD[T], val parentRddIndex: Int, @transient parentRddPartitionIndex: Int) extends Partition { // parentPartition来源于该分区对应父rdd的分区索引为parentRddPartitionIndex的Partition var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex) def preferredLocations(): Seq[String] = rdd.preferredLocations(parentPartition) override val index: Int = idx @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { // Update the reference to parent split at the time of task serialization parentPartition = rdd.partitions(parentRddPartitionIndex) oos.defaultWriteObject() } } |
因此当两者的分区函数不相同时,其执行流程如下:
32. 两个加加++
其作用就是union
/** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */ def ++(other: RDD[T]): RDD[T] = withScope { this.union(other) } |
33.intersection
求2个RDD的交集,其中相同的值只输出一次。
/** * Return the intersection of this RDD and another one. The output will not contain any duplicate * elements, even if the input RDDs did. * * Note that this method performs a shuffle internally. */ def intersection(other: RDD[T]): RDD[T] = withScope { this.map(v => (v, null)).cogroup(other.map(v => (v, null))) .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } .keys } |
其大致流程是先求交的两个rdd映射为KV对的pairRDD,其中V为null,然后生成CoGroupedRDD,接着对CoGroupedRDD的values进行转化为V为两个迭代器,紧接着进行筛选,保留左右两边rdd都存在的记录,最后返回其KEY值,即原始的左右两RDD的内容。
cogroup函数的内部实现如下:
/** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { //defaultPartitioner求默认的分区函数 cogroup(other, defaultPartitioner(self, other)) } /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) //将cg的values映射为两个数组的迭代器 cg.mapValues { case Array(vs, w1s) => (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]]) } } |
因此CoGroupedRDD的实现如下:
/** * :: DeveloperApi :: * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a * tuple with the list of values for that key. * * Note: This is an internal API. We recommend users use RDD.cogroup(...) instead of * instantiating this directly. * @param rdds parent RDDs. * @param part partitioner used to partition the shuffle output */ @DeveloperApi class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner) extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) { // For example, `(k, a) cogroup (k, b)` produces k -> Array(ArrayBuffer as, ArrayBuffer bs). // Each ArrayBuffer is represented as a CoGroup, and the resulting Array as a CoGroupCombiner. // CoGroupValue is the intermediate state of each value before being merged in compute. private type CoGroup = CompactBuffer[Any] private type CoGroupValue = (Any, Int) // Int is dependency number private type CoGroupCombiner = Array[CoGroup] private var serializer: Option[Serializer] = None /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */ def setSerializer(serializer: Serializer): CoGroupedRDD[K] = { this.serializer = Option(serializer) this } override def getDependencies: Seq[Dependency[_]] = { rdds.map { rdd: RDD[_ <: Product2[K, _]] => //如果rdd的分区函数和CoGroupedRDD的分区函数相同,则相互之间的依赖是窄依赖 if (rdd.partitioner == Some(part)) { logDebug("Adding one-to-one dependency with " + rdd) new OneToOneDependency(rdd) } else { //否则是宽依赖 logDebug("Adding shuffle dependency with " + rdd) new ShuffleDependency[K, Any, CoGroupCombiner](rdd, part, serializer) } } } /* * 获取其分区配置信息CoGroupPartition,其由CoGroupPartition( idx: Int, val narrowDeps: Array[Option[NarrowCoGroupSplitDep]])组成
其中idx代表对应分区索引,narrowDeps存储的是其依赖的数组
*/ override def getPartitions: Array[Partition] = { val array = new Array[Partition](part.numPartitions) for (i <- 0 until array.length) { // Each CoGroupPartition will have a dependency per contributing RDD array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) => // Assume each RDD contributed a single dependency, and get it dependencies(j) match { //宽依赖直接返回None case s: ShuffleDependency[_, _, _] => None case _ => //其他则为窄依赖 Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))) } }.toArray) } array } override val partitioner: Some[Partitioner] = Some(part) //在每个分区上根据传入的CoGroupPartition进行计算 override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = { val sparkConf = SparkEnv.get.conf //此参数决定了其中间整理的过程是在内存中执行还是内存+磁盘中执行 val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true) val split = s.asInstanceOf[CoGroupPartition] //代表有多少个rdd,每个rdd根据分区函数对应其依赖 val numRdds = dependencies.length // A list of (rdd iterator, dependency number) pairs // rddIterators是个KV的迭代器,其K为Product2的迭代器,其V是其索引 val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)] for ((dep, depNum) <- dependencies.zipWithIndex) dep match { //如果是窄依赖,则直接拉取父RDD对应分区的数值 case oneToOneDependency: OneToOneDependency[Product2[K, Any]] => val dependencyPartition = split.narrowDeps(depNum).get.split // Read them from the parent val it = oneToOneDependency.rdd.iterator(dependencyPartition, context) rddIterators += ((it, depNum)) //如果是宽依赖,则从shuffle的中间结果拉取对应分区的数值 case shuffleDependency: ShuffleDependency[_, _, _] => // Read map outputs of shuffle val it = SparkEnv.get.shuffleManager .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context) .read() rddIterators += ((it, depNum)) } /* * rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)] * [{Iterator[Product2[K, Any]],0},{Iterator[Product2[K, Any]],1}] * */ if (!externalSorting) {//在内存中整理中间结果 val map = new AppendOnlyMap[K, CoGroupCombiner]//CoGroupCombiner为Buffer数组,相同的K只会保留1个 val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => { if (hadVal) oldVal else Array.fill(numRdds)(new CoGroup) } val getCombiner: K => CoGroupCombiner = key => { map.changeValue(key, update) } //遍历迭代器数组,将相同的KEY的V存放在CoGroupCombiner里面 rddIterators.foreach { case (it, depNum) => while (it.hasNext) { val kv = it.next() getCombiner(kv._1)(depNum) += kv._2 } } new InterruptibleIterator(context, map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]]) } else {//在内存+磁盘中整理中间结果 val map = createExternalMap(numRdds) //插入到ExternalAppendOnlyMap里面 for ((it, depNum) <- rddIterators) { map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum)))) } context.taskMetrics.incMemoryBytesSpilled(map.memoryBytesSpilled) context.taskMetrics.incDiskBytesSpilled(map.diskBytesSpilled) new InterruptibleIterator(context, map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]]) } } …… } |
因此假设两个RDD执行cogroup,其中一个rdd的分区函数为hash分区,分区个数为3,另外一个rdd没有分区函数,则其执行流程如下:
34.glom
glom函数将每个分区形成一个数组,得到一个新的GlommedRDD。
/** * Return an RDD created by coalescing all elements within each partition into an array. */ def glom(): RDD[Array[T]] = withScope { //通过Iterator(iter.toArray)将其转化为数组 new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray)) } |
其执行过程如下:
35.cartesian
这个操作返回两个RDD的笛卡尔集,这个操作不会执行shuffle
/** * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of * elements (a, b) where a is in `this` and b is in `other`. */ def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { new CartesianRDD(sc, this, other) } |
其主要由CartesianRDD实现,继续往下看:
private[spark] class CartesianRDD[T: ClassTag, U: ClassTag]( sc: SparkContext, var rdd1 : RDD[T], var rdd2 : RDD[U]) extends RDD[Pair[T, U]](sc, Nil) with Serializable { val numPartitionsInRdd2 = rdd2.partitions.length //分区个数为两个rdd的分区数目之积 override def getPartitions: Array[Partition] = { // create the cross product split val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length) for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) { val idx = s1.index * numPartitionsInRdd2 + s2.index //分区索引idx的数据来源于rdd1的index分区和rdd2的index分区 array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index) } array } override def getPreferredLocations(split: Partition): Seq[String] = { val currSplit = split.asInstanceOf[CartesianPartition] (rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)).distinct } //通过遍历rdd1的s1分区和rdd2的s2分区组装成当前CartesianPartition的分区数据 override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = { val currSplit = split.asInstanceOf[CartesianPartition] for (x <- rdd1.iterator(currSplit.s1, context); y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) } //返回的都是窄依赖 override def getDependencies: Seq[Dependency[_]] = List( new NarrowDependency(rdd1) { def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2) }, new NarrowDependency(rdd2) { def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2) } ) override def clearDependencies() { super.clearDependencies() rdd1 = null rdd2 = null } } |
其具体的执行过程如下:
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9297.html