Spark-Sql源码解析之七 Execute: executed Plan -> RDD[Row]详解大数据

SparkPlan如何执行呢,SparkPlan是如何转变为RDD[Row]的呢?首先看一段代码:

SQLContext sqlContext = new SQLContext(jsc); 
DataFrame dataFrame = sqlContext.parquetFile(parquetPath); 
dataFrame.registerTempTable(source); 
String sql = " select SUM(id) from test group by dev_chnid "; 
DataFrame result = sqlContext.sql(sql); 
log.info("Result:"+result.collect());//collect触发action 
 
override def collect(): Array[Row] = { 
  val ret = queryExecution.executedPlan.executeCollect()//执行executedPlan的executeCollect 
  ret 
} 
def executeCollect(): Array[Row] = { 
  execute().mapPartitions { iter => 
    val converter = CatalystTypeConverters.createToScalaConverter(schema) 
    iter.map(converter(_).asInstanceOf[Row]) 
  }.collect()//最终执行的是executedPlan的execute,即SparkPlan的execute 
} 
def collect(): Array[T] = withScope { 
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) 
  Array.concat(results: _*) 
}

查看SparkPlan的execute函数:

abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable { 
  …… 
final def execute(): RDD[Row] = { 
    RDDOperationScope.withScope(sparkContext, nodeName, false, true) { 
      doExecute()//执行各个具体SparkPlan的doExecute函数 
    } 
  } 
…… 
}

可以每个具体的SparkPlan都会封装一个doExecute函数,其输出为RDD[Row]。就拿select SUM(id) from test group by dev_chnid语句来说,其executePlan为:

Aggregate false, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L] 
 Exchange (HashPartitioning 200) 
  Aggregate true, [dev_chnid#0], [dev_chnid#0,SUM(id#17L) AS PartialSum#45L] 
   PhysicalRDD [dev_chnid#0,id#17L], MapPartitionsRDD

先看下Aggregatefalse, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L]的doExecute的函数:

protected override def doExecute(): RDD[Row] = attachTree(this, "execute") { 
  if (groupingExpressions.isEmpty) {//如果没有分组 
    child.execute().mapPartitions { iter =>//执行child的execute函数 
      val buffer = newAggregateBuffer() 
      var currentRow: Row = null 
      while (iter.hasNext) { 
        currentRow = iter.next() 
        var i = 0 
        while (i < buffer.length) {//计算全局的值 
          buffer(i).update(currentRow) 
          i += 1 
        } 
      } 
      val resultProjection = new InterpretedProjection(resultExpressions, computedSchema) 
      val aggregateResults = new GenericMutableRow(computedAggregates.length) 
 
      var i = 0 
      while (i < buffer.length) { 
        aggregateResults(i) = buffer(i).eval(EmptyRow) 
        i += 1 
      } 
 
      Iterator(resultProjection(aggregateResults)) 
    } 
  } else { 
    child.execute().mapPartitions { iter =>//执行child的execute函数 
      val hashTable = new HashMap[Row, Array[AggregateFunction]] 
      //groupingExpressions = [dev_chnid#0] 
      //child.output = [dev_chnid#0,id#17L] 
      val groupingProjection = new InterpretedMutableProjection(groupingExpressions, child.output) 
 
      var currentRow: Row = null 
      while (iter.hasNext) { 
        currentRow = iter.next() 
        val currentGroup = groupingProjection(currentRow) 
        var currentBuffer = hashTable.get(currentGroup) 
        if (currentBuffer == null) { 
          currentBuffer = newAggregateBuffer() 
          hashTable.put(currentGroup.copy(), currentBuffer) 
        } 
 
        var i = 0 
        while (i < currentBuffer.length) { 
          currentBuffer(i).update(currentRow)//计算不同分组情况下的聚合值 
          i += 1 
        } 
      } 
 
      new Iterator[Row] { 
        private[this] val hashTableIter = hashTable.entrySet().iterator() 
        private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length) 
        private[this] val resultProjection = 
          new InterpretedMutableProjection( 
            resultExpressions, computedSchema ++ namedGroups.map(_._2)) 
        private[this] val joinedRow = new JoinedRow4 
 
        override final def hasNext: Boolean = hashTableIter.hasNext 
 
        override final def next(): Row = { 
          val currentEntry = hashTableIter.next() 
          val currentGroup = currentEntry.getKey 
          val currentBuffer = currentEntry.getValue 
 
          var i = 0 
          while (i < currentBuffer.length) { 
            // Evaluating an aggregate buffer returns the result.  No row is required since we 
            // already added all rows in the group using update. 
            aggregateResults(i) = currentBuffer(i).eval(EmptyRow) 
            i += 1 
          } 
          resultProjection(joinedRow(aggregateResults, currentGroup))//返回不同分组下的值 
        } 
      } 
    } 
  } 
}

Aggregatefalse, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L]的child为Exchange (HashPartitioning 200),即:

case class Exchange( 
    newPartitioning: Partitioning, 
    newOrdering: Seq[SortOrder], 
    child: SparkPlan) 
  extends UnaryNode { 
protected override def doExecute(): RDD[Row] = attachTree(this , "execute") { 
  newPartitioning match { 
    case HashPartitioning(expressions, numPartitions) => 
      val keySchema = expressions.map(_.dataType).toArray 
      val valueSchema = child.output.map(_.dataType).toArray 
      val serializer = getSerializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions) 
      val part = new HashPartitioner(numPartitions) 
      val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) { 
        child.execute().mapPartitions { iter => 
          val hashExpressions = newMutableProjection(expressions, child.output)() 
          iter.map(r => (hashExpressions(r).copy(), r.copy())) 
        } 
      } else { 
        child.execute().mapPartitions { iter => 
          val hashExpressions = newMutableProjection(expressions, child.output)() 
          val mutablePair = new MutablePair[Row, Row]() 
          iter.map(r => mutablePair.update(hashExpressions(r), r)) 
        } 
      } 
      val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)//其中rdd为其child的输出,part指定分区规则 
      if (newOrdering.nonEmpty) { 
        shuffled.setKeyOrdering(keyOrdering) 
      } 
      shuffled.setSerializer(serializer) 
      shuffled.map(_._2) 
    …… 
    case _ => sys.error(s"Exchange not implemented for $newPartitioning") 
    // TODO: Handle BroadcastPartitioning. 
  } 
} 
}

Exchange返回的ShuffleRDD的输入为其child的输出,即:

Aggregatetrue, [dev_chnid#0], [dev_chnid#0,SUM(id#17L) AS PartialSum#45L],以此类推,接下来我们看下最后那个SparkPlan:PhysicalRDD

private[sql] case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { 
  protected override def doExecute(): RDD[Row] = rdd//直接返回其构造函数的rdd 
}

PhysicalRDD的doExecute直接返回其构造函数的rdd,写得太抽象了吧,因此我们来追踪下这个rdd究竟是什么。

private[sql] object DataSourceStrategy extends Strategy with Logging { 
  def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match { 
      …… 
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) => 
val sharedHadoopConf = SparkHadoopUtil.get.conf 
val confBroadcast = 
  t.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf)) 
pruneFilterProject( 
  l, 
  projectList, 
  filters, 
  (a, f) => t.buildScan(a, f, t.paths, confBroadcast)) :: Nil 
…… 
  } 
  protected def pruneFilterProject( 
    relation: LogicalRelation, 
    projectList: Seq[NamedExpression], 
    filterPredicates: Seq[Expression], 
    scanBuilder: (Array[String], Array[Filter]) => RDD[Row]) = { 
  pruneFilterProjectRaw( 
    relation, 
    projectList, 
    filterPredicates, 
    (requestedColumns, pushedFilters) => { 
      scanBuilder(requestedColumns.map(_.name).toArray, selectFilters(pushedFilters).toArray) 
    }) 
  } 
  protected def pruneFilterProjectRaw( 
    relation: LogicalRelation, 
    projectList: Seq[NamedExpression], 
    filterPredicates: Seq[Expression], 
    scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[Row]) = { 
 
  val projectSet = AttributeSet(projectList.flatMap(_.references)) 
  val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) 
  val filterCondition = filterPredicates.reduceLeftOption(expressions.And) 
 
  val pushedFilters = filterPredicates.map { _ transform { 
    case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes. 
  }} 
  if (projectList.map(_.toAttribute) == projectList && 
      projectSet.size == projectList.size && 
      filterSet.subsetOf(projectSet)) { 
    // When it is possible to just use column pruning to get the right projection and 
    // when the columns of this projection are enough to evaluate all filter conditions, 
    // just do a scan followed by a filter, with no extra project. 
    val requestedColumns = 
      projectList.asInstanceOf[Seq[Attribute]] // Safe due to if above. 
        .map(relation.attributeMap)            // Match original case of attributes. 
    val scan = createPhysicalRDD(relation.relation, projectList.map(_.toAttribute), 
        scanBuilder(requestedColumns, pushedFilters)) 
    filterCondition.map(execution.Filter(_, scan)).getOrElse(scan) 
  } else { 
    val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq 
 
    val scan = createPhysicalRDD(relation.relation, requestedColumns, 
      scanBuilder(requestedColumns, pushedFilters)) 
    execution.Project(projectList, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) 
  } 
} 
private[this] def createPhysicalRDD( 
    relation: BaseRelation, 
    output: Seq[Attribute], 
    rdd: RDD[Row]): SparkPlan = { 
  val converted = if (relation.needConversion) { 
    execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType)) 
  } else { 
    rdd 
  } 
  execution.PhysicalRDD(output, converted) 
} 
} 

可见其rdd本质上是buildscan的返回值,spark根据不同的HadoopFsRelation编写不同的buildscan,基于Spark1.4.0的Parquet文件对应的HadoopFsRelation为ParquetRelation2,即:

private[sql] class ParquetRelation2( 
    override val paths: Array[String], 
    private val maybeDataSchema: Option[StructType], 
    // This is for metastore conversion. 
    private val maybePartitionSpec: Option[PartitionSpec], 
    override val userDefinedPartitionColumns: Option[StructType], 
    parameters: Map[String, String])( 
    val sqlContext: SQLContext) 
  extends HadoopFsRelation(maybePartitionSpec) 
  with Logging { 
override def buildScan( 
    requiredColumns: Array[String], 
    filters: Array[Filter], 
    inputFiles: Array[FileStatus], 
    broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = { 
  val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean 
  val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown 
  // Create the function to set variable Parquet confs at both driver and executor side. 
  val initLocalJobFuncOpt = 
    ParquetRelation2.initializeLocalJobFunc( 
      requiredColumns, 
      filters, 
      dataSchema, 
      useMetadataCache, 
      parquetFilterPushDown) _ 
  // Create the function to set input paths at the driver side. 
  val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _ 
 
  val footers = inputFiles.map(f => metadataCache.footers(f.getPath)) 
 
  Utils.withDummyCallSite(sqlContext.sparkContext) { 
    // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. 
    // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects 
    // and footers. Especially when a global arbitrative schema (either from metastore or data 
    // source DDL) is available. 
    new SqlNewHadoopRDD( 
      sc = sqlContext.sparkContext, 
      broadcastedConf = broadcastedConf, 
      initDriverSideJobFuncOpt = Some(setInputPaths), 
      initLocalJobFuncOpt = Some(initLocalJobFuncOpt), 
      inputFormatClass = classOf[FilteringParquetRowInputFormat], 
      keyClass = classOf[Void], 
      valueClass = classOf[Row]) { 
      val cacheMetadata = useMetadataCache 
      @transient val cachedStatuses = inputFiles.map { f => 
        // In order to encode the authority of a Path containing special characters such as '/' 
        // (which does happen in some S3N credentials), we need to use the string returned by the 
        // URI of the path to create a new Path. 
        val pathWithEscapedAuthority = escapePathUserInfo(f.getPath) 
        new FileStatus( 
          f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime, 
          f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority) 
      }.toSeq 
      @transient val cachedFooters = footers.map { f => 
        // In order to encode the authority of a Path containing special characters such as /, 
        // we need to use the string returned by the URI of the path to create a new Path. 
        new Footer(escapePathUserInfo(f.getFile), f.getParquetMetadata) 
      }.toSeq 
      private def escapePathUserInfo(path: Path): Path = { 
        val uri = path.toUri 
        new Path(new URI( 
          uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, uri.getPath, 
          uri.getQuery, uri.getFragment)) 
      } 
      // Overridden so we can inject our own cached files statuses. 
      override def getPartitions: Array[SparkPartition] = { 
        val inputFormat = if (cacheMetadata) { 
          new FilteringParquetRowInputFormat { 
            override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses 
            override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters 
          } 
        } else { 
          new FilteringParquetRowInputFormat 
        } 
        val jobContext = newJobContext(getConf(isDriverSide = true), jobId) 
        val rawSplits = inputFormat.getSplits(jobContext) 
        Array.tabulate[SparkPartition](rawSplits.size) { i => 
          new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) 
        } 
      } 
    }.values 
  } 
}
其返回一个SqlNewHadoopRDD,至此已经将sql语言转换为Spark的计算模型即rdd,接下来就是根据不同的rdd切分不同的stage,然后提交stage内的task来进行运算了,最后我们来看下其具体的执行情况:

Spark-Sql源码解析之七 Execute: executed Plan -> RDD[Row]详解大数据

由于先局部聚合,然后全局聚合,因此job 0 被切分为2个Stage,stage 0 和stage 1,如下:

Spark-Sql源码解析之七 Execute: executed Plan -> RDD[Row]详解大数据

Spark-Sql源码解析之七 Execute: executed Plan -> RDD[Row]详解大数据

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9307.html

(0)
上一篇 2021年7月19日 09:24
下一篇 2021年7月19日 09:24

相关推荐

发表回复

登录后才能评论