SparkPlan如何执行呢,SparkPlan是如何转变为RDD[Row]的呢?首先看一段代码:
SQLContext sqlContext = new SQLContext(jsc);
DataFrame dataFrame = sqlContext.parquetFile(parquetPath);
dataFrame.registerTempTable(source);
String sql = " select SUM(id) from test group by dev_chnid ";
DataFrame result = sqlContext.sql(sql);
log.info("Result:"+result.collect());//collect触发action
override def collect(): Array[Row] = {
val ret = queryExecution.executedPlan.executeCollect()//执行executedPlan的executeCollect
ret
}
def executeCollect(): Array[Row] = {
execute().mapPartitions { iter =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
iter.map(converter(_).asInstanceOf[Row])
}.collect()//最终执行的是executedPlan的execute,即SparkPlan的execute
}
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
查看SparkPlan的execute函数:
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable {
……
final def execute(): RDD[Row] = {
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
doExecute()//执行各个具体SparkPlan的doExecute函数
}
}
……
}
可以每个具体的SparkPlan都会封装一个doExecute函数,其输出为RDD[Row]。就拿select SUM(id) from test group by dev_chnid语句来说,其executePlan为:
Aggregate false, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L] Exchange (HashPartitioning 200) Aggregate true, [dev_chnid#0], [dev_chnid#0,SUM(id#17L) AS PartialSum#45L] PhysicalRDD [dev_chnid#0,id#17L], MapPartitionsRDD
先看下Aggregatefalse, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L]的doExecute的函数:
protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
if (groupingExpressions.isEmpty) {//如果没有分组
child.execute().mapPartitions { iter =>//执行child的execute函数
val buffer = newAggregateBuffer()
var currentRow: Row = null
while (iter.hasNext) {
currentRow = iter.next()
var i = 0
while (i < buffer.length) {//计算全局的值
buffer(i).update(currentRow)
i += 1
}
}
val resultProjection = new InterpretedProjection(resultExpressions, computedSchema)
val aggregateResults = new GenericMutableRow(computedAggregates.length)
var i = 0
while (i < buffer.length) {
aggregateResults(i) = buffer(i).eval(EmptyRow)
i += 1
}
Iterator(resultProjection(aggregateResults))
}
} else {
child.execute().mapPartitions { iter =>//执行child的execute函数
val hashTable = new HashMap[Row, Array[AggregateFunction]]
//groupingExpressions = [dev_chnid#0]
//child.output = [dev_chnid#0,id#17L]
val groupingProjection = new InterpretedMutableProjection(groupingExpressions, child.output)
var currentRow: Row = null
while (iter.hasNext) {
currentRow = iter.next()
val currentGroup = groupingProjection(currentRow)
var currentBuffer = hashTable.get(currentGroup)
if (currentBuffer == null) {
currentBuffer = newAggregateBuffer()
hashTable.put(currentGroup.copy(), currentBuffer)
}
var i = 0
while (i < currentBuffer.length) {
currentBuffer(i).update(currentRow)//计算不同分组情况下的聚合值
i += 1
}
}
new Iterator[Row] {
private[this] val hashTableIter = hashTable.entrySet().iterator()
private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length)
private[this] val resultProjection =
new InterpretedMutableProjection(
resultExpressions, computedSchema ++ namedGroups.map(_._2))
private[this] val joinedRow = new JoinedRow4
override final def hasNext: Boolean = hashTableIter.hasNext
override final def next(): Row = {
val currentEntry = hashTableIter.next()
val currentGroup = currentEntry.getKey
val currentBuffer = currentEntry.getValue
var i = 0
while (i < currentBuffer.length) {
// Evaluating an aggregate buffer returns the result. No row is required since we
// already added all rows in the group using update.
aggregateResults(i) = currentBuffer(i).eval(EmptyRow)
i += 1
}
resultProjection(joinedRow(aggregateResults, currentGroup))//返回不同分组下的值
}
}
}
}
}
Aggregatefalse, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L]的child为Exchange (HashPartitioning 200),即:
case class Exchange(
newPartitioning: Partitioning,
newOrdering: Seq[SortOrder],
child: SparkPlan)
extends UnaryNode {
protected override def doExecute(): RDD[Row] = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
val keySchema = expressions.map(_.dataType).toArray
val valueSchema = child.output.map(_.dataType).toArray
val serializer = getSerializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions)
val part = new HashPartitioner(numPartitions)
val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) {
child.execute().mapPartitions { iter =>
val hashExpressions = newMutableProjection(expressions, child.output)()
iter.map(r => (hashExpressions(r).copy(), r.copy()))
}
} else {
child.execute().mapPartitions { iter =>
val hashExpressions = newMutableProjection(expressions, child.output)()
val mutablePair = new MutablePair[Row, Row]()
iter.map(r => mutablePair.update(hashExpressions(r), r))
}
}
val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)//其中rdd为其child的输出,part指定分区规则
if (newOrdering.nonEmpty) {
shuffled.setKeyOrdering(keyOrdering)
}
shuffled.setSerializer(serializer)
shuffled.map(_._2)
……
case _ => sys.error(s"Exchange not implemented for $newPartitioning")
// TODO: Handle BroadcastPartitioning.
}
}
}
Exchange返回的ShuffleRDD的输入为其child的输出,即:
Aggregatetrue, [dev_chnid#0], [dev_chnid#0,SUM(id#17L) AS PartialSum#45L],以此类推,接下来我们看下最后那个SparkPlan:PhysicalRDD
private[sql] case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
protected override def doExecute(): RDD[Row] = rdd//直接返回其构造函数的rdd
}
PhysicalRDD的doExecute直接返回其构造函数的rdd,写得太抽象了吧,因此我们来追踪下这个rdd究竟是什么。
private[sql] object DataSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
……
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
val sharedHadoopConf = SparkHadoopUtil.get.conf
val confBroadcast =
t.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf))
pruneFilterProject(
l,
projectList,
filters,
(a, f) => t.buildScan(a, f, t.paths, confBroadcast)) :: Nil
……
}
protected def pruneFilterProject(
relation: LogicalRelation,
projectList: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Array[String], Array[Filter]) => RDD[Row]) = {
pruneFilterProjectRaw(
relation,
projectList,
filterPredicates,
(requestedColumns, pushedFilters) => {
scanBuilder(requestedColumns.map(_.name).toArray, selectFilters(pushedFilters).toArray)
})
}
protected def pruneFilterProjectRaw(
relation: LogicalRelation,
projectList: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[Row]) = {
val projectSet = AttributeSet(projectList.flatMap(_.references))
val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
val filterCondition = filterPredicates.reduceLeftOption(expressions.And)
val pushedFilters = filterPredicates.map { _ transform {
case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
}}
if (projectList.map(_.toAttribute) == projectList &&
projectSet.size == projectList.size &&
filterSet.subsetOf(projectSet)) {
// When it is possible to just use column pruning to get the right projection and
// when the columns of this projection are enough to evaluate all filter conditions,
// just do a scan followed by a filter, with no extra project.
val requestedColumns =
projectList.asInstanceOf[Seq[Attribute]] // Safe due to if above.
.map(relation.attributeMap) // Match original case of attributes.
val scan = createPhysicalRDD(relation.relation, projectList.map(_.toAttribute),
scanBuilder(requestedColumns, pushedFilters))
filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
} else {
val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq
val scan = createPhysicalRDD(relation.relation, requestedColumns,
scanBuilder(requestedColumns, pushedFilters))
execution.Project(projectList, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
}
}
private[this] def createPhysicalRDD(
relation: BaseRelation,
output: Seq[Attribute],
rdd: RDD[Row]): SparkPlan = {
val converted = if (relation.needConversion) {
execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))
} else {
rdd
}
execution.PhysicalRDD(output, converted)
}
}
可见其rdd本质上是buildscan的返回值,spark根据不同的HadoopFsRelation编写不同的buildscan,基于Spark1.4.0的Parquet文件对应的HadoopFsRelation为ParquetRelation2,即:
private[sql] class ParquetRelation2(
override val paths: Array[String],
private val maybeDataSchema: Option[StructType],
// This is for metastore conversion.
private val maybePartitionSpec: Option[PartitionSpec],
override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec)
with Logging {
override def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputFiles: Array[FileStatus],
broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
// Create the function to set variable Parquet confs at both driver and executor side.
val initLocalJobFuncOpt =
ParquetRelation2.initializeLocalJobFunc(
requiredColumns,
filters,
dataSchema,
useMetadataCache,
parquetFilterPushDown) _
// Create the function to set input paths at the driver side.
val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _
val footers = inputFiles.map(f => metadataCache.footers(f.getPath))
Utils.withDummyCallSite(sqlContext.sparkContext) {
// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects
// and footers. Especially when a global arbitrative schema (either from metastore or data
// source DDL) is available.
new SqlNewHadoopRDD(
sc = sqlContext.sparkContext,
broadcastedConf = broadcastedConf,
initDriverSideJobFuncOpt = Some(setInputPaths),
initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
inputFormatClass = classOf[FilteringParquetRowInputFormat],
keyClass = classOf[Void],
valueClass = classOf[Row]) {
val cacheMetadata = useMetadataCache
@transient val cachedStatuses = inputFiles.map { f =>
// In order to encode the authority of a Path containing special characters such as '/'
// (which does happen in some S3N credentials), we need to use the string returned by the
// URI of the path to create a new Path.
val pathWithEscapedAuthority = escapePathUserInfo(f.getPath)
new FileStatus(
f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority)
}.toSeq
@transient val cachedFooters = footers.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
new Footer(escapePathUserInfo(f.getFile), f.getParquetMetadata)
}.toSeq
private def escapePathUserInfo(path: Path): Path = {
val uri = path.toUri
new Path(new URI(
uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, uri.getPath,
uri.getQuery, uri.getFragment))
}
// Overridden so we can inject our own cached files statuses.
override def getPartitions: Array[SparkPartition] = {
val inputFormat = if (cacheMetadata) {
new FilteringParquetRowInputFormat {
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
}
} else {
new FilteringParquetRowInputFormat
}
val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
val rawSplits = inputFormat.getSplits(jobContext)
Array.tabulate[SparkPartition](rawSplits.size) { i =>
new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
}
}.values
}
}
其返回一个SqlNewHadoopRDD,至此已经将sql语言转换为Spark的计算模型即rdd,接下来就是根据不同的rdd切分不同的stage,然后提交stage内的task来进行运算了,最后我们来看下其具体的执行情况:
![Spark-Sql源码解析之七 Execute: executed Plan -> RDD[Row]详解大数据](https://blog.ytso.com/wp-content/themes/justnews/themer/assets/images/lazy.png)
由于先局部聚合,然后全局聚合,因此job 0 被切分为2个Stage,stage 0 和stage 1,如下:
![Spark-Sql源码解析之七 Execute: executed Plan -> RDD[Row]详解大数据](https://blog.ytso.com/wp-content/themes/justnews/themer/assets/images/lazy.png)
![Spark-Sql源码解析之七 Execute: executed Plan -> RDD[Row]详解大数据](https://blog.ytso.com/wp-content/themes/justnews/themer/assets/images/lazy.png)
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9307.html