Spark-Sql源码解析之一 引言详解大数据

1.1 Demo

以一个Spark-Sql的例子开始:

public class TestSparkSql { 
    public static void main(String[] args) { 
        Logger log = Logger.getLogger(TestSparkSql.class); 
        System.setProperty("javax.xml.parsers.DocumentBuilderFactory", 
                "com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl"); 
        System.setProperty("javax.xml.parsers.SAXParserFactory", 
                "com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl"); 
        String sparkMaster = Configure.instance.get("sparkMaster"); 
        String sparkJarAddress = Configure.instance.get("sparkJarAddress"); 
        String sparkExecutorMemory = Configure.instance.get("sparkExecutorMemory"); 
        String sparkCoresMax = Configure.instance.get("sparkCoresMax"); 
        String sparkLocalDir = Configure.instance.get("sparkLocalDir"); 
        log.info("initialize parameters"); 
        log.info("sparkMaster:" + sparkMaster); 
        log.info("sparkJarAddress:" + sparkJarAddress); 
        log.info("sparkExecutorMemory:" + sparkExecutorMemory); 
        log.info("sparkCoresMax:" + sparkCoresMax); 
        log.info("sparkLocalDir:" + sparkLocalDir); 
        SparkConf sparkConf = new SparkConf().setAppName("dse load application in Java"); 
        sparkConf.setMaster(sparkMaster); 
        if (!sparkJarAddress.isEmpty() && !sparkMaster.contains("local")) { 
            sparkConf.set("spark.executor.memory", sparkExecutorMemory); // 16g 
            sparkConf.set("spark.scheduler.mode", "FAIR"); 
            sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 
            sparkConf.set("spark.kryo.registrator", "com.dahua.dse3.driver.dataset.DseKryoRegistrator"); 
            sparkConf.set("spark.cores.max", sparkCoresMax); 
            sparkConf.set("spark.akka.threads", "12"); 
            sparkConf.set("spark.local.dir", sparkLocalDir); 
            sparkConf.set("spark.shuffle.manager", "SORT"); 
            sparkConf.set("spark.network.timeout", "120"); 
            sparkConf.set("spark.rpc.lookupTimeout", "120"); 
            sparkConf.set("spark.executor.extraClassPath", "/usr/dahua/spark/executelib/hbase-protocol-0.98.3-hadoop2.jar"); 
            sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"); 
            sparkConf.set("spark.sql.codegen", "TRUE"); 
            //sparkConf.set("spark.sql.parquet.filterPushdown","true"); 
        } 
        JavaSparkContext jsc = new JavaSparkContext(sparkConf); 
        if (!sparkJarAddress.isEmpty() && !sparkMaster.contains("local")) { 
            jsc.addJar(sparkJarAddress); 
        } 
        String hdfsPath = "hdfs://mycluster/wl/parquet/test/2016-06-21"; 
        String source = "test"; 
        SQLContext sqlContext = new SQLContext(jsc); 
        DataFrame dataFrame = sqlContext.parquetFile(hdfsPath); 
        dataFrame.registerTempTable(source); 
        String sql = "SELECT id,dev_chnid,dev_chnname,car_num,car_speed,car_direct from test"; 
        DataFrame result = sqlContext.sql(sql); 
        log.info("Result:"+result.count()); 
    } 
} 

当执行result.count()就会触发客户端这边提交Job进行计算,先来看下关键日志打印(修改过源码方便日志打印):

16-07-08 17:19:46,080 INFO  org.apache.spark.sql.SQLContext(Logging.scala:59) ## ----------------------parseSql start-------------------------- 
16-07-08 17:19:46,080 INFO  org.apache.spark.sql.SQLContext(Logging.scala:59) ##  
[SELECT id,dev_chnid,dev_chnname,car_num,car_speed,car_direct from test] 
16-07-08 17:19:46,728 INFO  org.apache.spark.sql.SQLContext(Logging.scala:59) ## ----------------------parseSql end  -------------------------- 
16-07-08 17:19:46,738 INFO  org.apache.spark.sql.SQLContext(Logging.scala:59) ##  
['Project ['id,'dev_chnid,'dev_chnname,'car_num,'car_speed,'car_direct] 
'UnresolvedRelation [test], None 
] 
…… 
16-07-08 17:29:28,651 INFO  org.apache.spark.scheduler.TaskSchedulerImpl(Logging.scala:59) ## Removed TaskSet 1.0, whose tasks have all completed, from pool default 
16-07-08 17:29:28,661 INFO  org.apache.spark.scheduler.DAGScheduler(Logging.scala:59) ## Job 0 finished: count at TestSparkSql.java:64, took 11.098610 s 
[== Parsed Logical Plan == 
Aggregate [COUNT(1) AS count#43L] 
 Project [id#0L,dev_chnid#26,dev_chnname#4,car_num#5,car_speed#8,car_direct#12] 
  Subquery test 
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] [email protected] 
== Analyzed Logical Plan == 
count: bigint 
Aggregate [COUNT(1) AS count#43L] 
 Project [id#0L,dev_chnid#26,dev_chnname#4,car_num#5,car_speed#8,car_direct#12] 
  Subquery test 
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] [email protected] 
== Optimized Logical Plan == 
Aggregate [COUNT(1) AS count#43L] 
 Project 
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] [email protected] 
== Spark Plan == 
GeneratedAggregate false, [Coalesce(SUM(PartialCount#44L),0) AS count#43L], false 
 GeneratedAggregate true, [COUNT(1) AS PartialCount#44L], false 
  PhysicalRDD MapPartitionsRDD[1] at 
== Execute Plan == 
GeneratedAggregate false, [Coalesce(SUM(PartialCount#44L),0) AS count#43L], false 
 Exchange SinglePartition 
  GeneratedAggregate true, [COUNT(1) AS PartialCount#44L], false 
   PhysicalRDD MapPartitionsRDD[1] at 
Code Generation: true 
== RDD ==] 
16-07-08 17:29:28,686 INFO  com.dahua.dse3.TestSparkSql(TestSparkSql.java:64) ## Result:1489740 

可见一个Sql语句转化为实际可执行的Spark的RDD模型需要经过以下几个步骤:

Spark-Sql源码解析之一 引言详解大数据

Spark-Sql源码解析之一 引言详解大数据

 Spark-Sql源码解析之一 引言详解大数据在进一步讲解之前,先主要介绍下Spark-SQL里面的主要类成员:

1.2 SQLContext

SQL上下文环境,它保存了QueryExecution中所需要的几个类:

1.2.1 Catalog

一个存储<tableName,logicalPlan>的map结构,查找关系的目录,注册表,注销表,查询表和逻辑计划关系的类

@transient 
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf) 
class SimpleCatalog(val conf: CatalystConf) extends Catalog { 
  val tables = new mutable.HashMap[String, LogicalPlan]() 
  override def registerTable( 
      tableIdentifier: Seq[String], 
      plan: LogicalPlan): Unit = { 
    //转化大小写 
    val tableIdent = processTableIdentifier(tableIdentifier) 
    tables += ((getDbTableName(tableIdent), plan)) 
  } 
  override def unregisterTable(tableIdentifier: Seq[String]): Unit = { 
    val tableIdent = processTableIdentifier(tableIdentifier) 
    tables -= getDbTableName(tableIdent) 
  } 
  override def unregisterAllTables(): Unit = { 
    tables.clear() 
  } 
  override def tableExists(tableIdentifier: Seq[String]): Boolean = { 
    val tableIdent = processTableIdentifier(tableIdentifier) 
    tables.get(getDbTableName(tableIdent)) match { 
      case Some(_) => true 
      case None => false 
    } 
  } 
  override def lookupRelation( 
      tableIdentifier: Seq[String], 
      alias: Option[String] = None): LogicalPlan = { 
    val tableIdent = processTableIdentifier(tableIdentifier) 
    val tableFullName = getDbTableName(tableIdent) 
    //  val tables = new mutable.HashMap[String, LogicalPlan](),根据表名获取logicalplan 
    val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName")) 
    val tableWithQualifiers = Subquery(tableIdent.last, table) 
    // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are 
    // properly qualified with this alias. 
    alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers) 
  } 
  override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { 
    tables.map { 
      case (name, _) => (name, true) 
    }.toSeq 
  } 
  override def refreshTable(databaseName: String, tableName: String): Unit = { 
    throw new UnsupportedOperationException 
  } 
}

1.2.2 SparkSQLParser

将Sql语句解析成语法树,返回一个Logical Plan。它首先拆分不同的SQL(将其分类),然后利用fallback解析。 

/** 
 * The top level Spark SQL parser. This parser recognizes syntaxes that are available for all SQL 
 * dialects supported by Spark SQL, and delegates all the other syntaxes to the `fallback` parser. 
 * 
 * @param fallback A function that parses an input string to a logical plan 
 */ 
private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParser { 
protected val AS = Keyword("AS") 
protected val CACHE = Keyword("CACHE") 
protected val CLEAR = Keyword("CLEAR") 
protected val IN = Keyword("IN") 
protected val LAZY = Keyword("LAZY") 
protected val SET = Keyword("SET") 
protected val SHOW = Keyword("SHOW") 
protected val TABLE = Keyword("TABLE") 
protected val TABLES = Keyword("TABLES") 
protected val UNCACHE = Keyword("UNCACHE") 
override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | show | others 
private lazy val cache: Parser[LogicalPlan] = 
  CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ { 
    case isLazy ~ tableName ~ plan => 
      CacheTableCommand(tableName, plan.map(fallback), isLazy.isDefined) 
  } 
private lazy val uncache: Parser[LogicalPlan] = 
  ( UNCACHE ~ TABLE ~> ident ^^ { 
      case tableName => UncacheTableCommand(tableName) 
    } 
  | CLEAR ~ CACHE ^^^ ClearCacheCommand 
  ) 
private lazy val set: Parser[LogicalPlan] = 
  SET ~> restInput ^^ { 
    case input => SetCommandParser(input) 
  } 
private lazy val show: Parser[LogicalPlan] = 
  SHOW ~> TABLES ~ (IN ~> ident).? ^^ { 
    case _ ~ dbName => ShowTablesCommand(dbName) 
  } 
private lazy val others: Parser[LogicalPlan] = 
  wholeInput ^^ { 
    case input => fallback(input) 
  } 
}

1.2.3 Analyzer

语法分析器,Analyzer会使用Catalog和FunctionRegistry将UnresolvedAttribute和UnresolvedRelation转换为catalyst里全类型的对象。例如将

‘UnresolvedRelation[test], None

转化为

Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42][email protected]

class Analyzer( 
    catalog: Catalog, 
    registry: FunctionRegistry, 
    conf: CatalystConf, 
    maxIterations: Int = 100) 
  extends RuleExecutor[LogicalPlan] with HiveTypeCoercion with CheckAnalysis { 
…… 
}

1.2.4 Optimizer

优化器,将Logical Plan进一步进行优化

object DefaultOptimizer extends Optimizer { 
  val batches = 
    // SubQueries are only needed for analysis and can be removed before execution. 
    Batch("Remove SubQueries", FixedPoint(100), 
      EliminateSubQueries) :: 
    Batch("Operator Reordering", FixedPoint(100), 
      UnionPushdown, 
      CombineFilters, 
      PushPredicateThroughProject, 
      PushPredicateThroughJoin, 
      PushPredicateThroughGenerate, 
      ColumnPruning, 
      ProjectCollapsing, 
      CombineLimits) :: 
    Batch("ConstantFolding", FixedPoint(100), 
    NullPropagation, 
      OptimizeIn, 
      ConstantFolding, 
      LikeSimplification, 
      BooleanSimplification, 
      SimplifyFilters, 
      SimplifyCasts, 
      SimplifyCaseConversionExpressions) :: 
    Batch("Decimal Optimizations", FixedPoint(100), 
      DecimalAggregates) :: 
    Batch("LocalRelation", FixedPoint(100), 
      ConvertToLocalRelation) :: Nil 
}

 
例如:

CombineFilters:递归合并两个相邻的filter。例如:将

Filter(a>1)

 Filter(b>1)

Project……

转化为

Filter(a>1) AND Filter(b>1)

 Project……

CombineLimits:合并两个相邻的limit。例如:将select * from (select * from c_picrecord limit 100)a limit 10

优化为:

Limit if ((100 < 10)) 100 else 10
Relation[id#0L,dev_id#1,dev_chnnum#2L,de……

1.2.5 SparkPlanner

将LogicalPlan转化为SparkPlan

protected[sql] class SparkPlanner extends SparkStrategies { 
  val sparkContext: SparkContext = self.sparkContext 
  val sqlContext: SQLContext = self 
  def codegenEnabled: Boolean = self.conf.codegenEnabled 
  def unsafeEnabled: Boolean = self.conf.unsafeEnabled 
  def numPartitions: Int = self.conf.numShufflePartitions 
  def strategies: Seq[Strategy] = 
    experimental.extraStrategies ++ ( 
    DataSourceStrategy :: 
    DDLStrategy :: 
    TakeOrdered :: 
    HashAggregation :: 
    LeftSemiJoin :: 
    HashJoin :: 
    InMemoryScans :: 
      ParquetOperations :: 
      BasicOperators :: 
      CartesianProduct :: 
      BroadcastNestedLoopJoin :: Nil) 
 }

比方说:

Subquery test

Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42][email protected]

通过DataSourceStrategy中的

// Scanning non-partitioned HadoopFsRelation 
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>

将其转化为
PhysicalRDD

1.2.6 PrepareForExecution

在SparkPlan中插入Shuffle的操作,如果前后2个SparkPlan的outputPartitioning不一样的话,则中间需要插入Shuffle的动作,比分说聚合函数,先局部聚合,然后全局聚合,局部聚合和全局聚合的分区规则是不一样的,中间需要进行一次Shuffle。

/** 
 * Prepares a planned SparkPlan for execution by inserting shuffle operations as needed. 
 */ 
@transient 
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] { 
  val batches = 
    Batch("Add exchange", Once, EnsureRequirements(self)) :: Nil 
}

例如

GeneratedAggregate false,[Coalesce(SUM(PartialCount#44L),0) AS count#43L], false

 GeneratedAggregatetrue, [COUNT(1) AS PartialCount#44L], false

    PhysicalRDDMapPartitionsRDD[1]

经过PrepareForExecution,转化为

GeneratedAggregate false,[Coalesce(SUM(PartialCount#44L),0) AS count#43L], false

 Exchange SinglePartition

 GeneratedAggregate true, [COUNT(1) AS PartialCount#44L], false

      PhysicalRDDMapPartitionsRDD[1]

1.3 QueryExecution

SQL语句执行环境

protected[sql] class QueryExecution(val logical: LogicalPlan) {//logical包含了Aggregate(groupingExprs, aggregates, df.logicalPlan) 
  def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed) 
  lazy val analyzed: LogicalPlan = analyzer.execute(logical) 
  lazy val withCachedData: LogicalPlan = { 
    assertAnalyzed() 
    cacheManager.useCachedData(analyzed) 
  } 
  lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)//优化过的LogicalPlan 
  // TODO: Don't just pick the first one... 
  lazy val sparkPlan: SparkPlan = { 
    SparkPlan.currentContext.set(self) 
    //SparkPlanner把LogicalPlan转化为SparkPlan 
    //1.4.1选取的是第一个strategies DataSourceStrategy 
    planner.plan(optimizedPlan).next() 
  } 
  lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan) 
  lazy val toRdd: RDD[Row] = { 
    toString 
    executedPlan.execute() 
  } 
  protected def stringOrError[A](f: => A): String = 
    try f.toString catch { case e: Throwable => e.toString } 
  def simpleString: String = 
    s"""== Physical Plan == 
       |${stringOrError(executedPlan)} 
    """.stripMargin.trim 
  //TODO:如何打印 
  override def toString: String = { 
    def output = 
      analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ") 
    // TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)}) 
    // however, the `toRdd` will cause the real execution, which is not what we want. 
    // We need to think about how to avoid the side effect. 
    s"""== Parsed Logical Plan == 
       |${stringOrError(logical)} 
       |== Analyzed Logical Plan == 
       |${stringOrError(output)} 
       |${stringOrError(analyzed)} 
       |== Optimized Logical Plan == 
       |${stringOrError(optimizedPlan)} 
       |== Physical Plan == 
       |${stringOrError(executedPlan)} 
       |Code Generation: ${stringOrError(executedPlan.codegenEnabled)} 
       |== RDD == 
    """.stripMargin.trim 
  } 
}

这里唯一需要注意的是analyzedoptimizedPlansparkPlanexecutedPlan都为懒变量,也就是说只有真正要用到的时时候才会去执行相应的代码逻辑,没有用到的时候是不会发生任何事情的。

1.4 LogicalPlan and SparkPlan

LogicalPlan和SparkPlan都继承自QueryPlan,QueryPlan为泛型类

abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] { 
} 
abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { 
} 
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable { 
}

以上都为抽象类,然后在此基础上又根据不同的类型衍生出不同的树节点

/** 
 * A logical plan node with no children.叶子节点,没有子节点 
 */ 
abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] { 
  self: Product => 
} 
/** 
 * A logical plan node with single child. 一元节点 
 */ 
abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] { 
  self: Product => 
} 
/** 
 * A logical plan node with a left and right child 二元节点. 
 */ 
abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] { 
  self: Product => 
}

//叶子节点,没有子节点 
private[sql] trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] { 
  self: Product => 
} 
//一元节点 
private[sql] trait UnaryNode extends SparkPlan with trees.UnaryNode[SparkPlan] { 
  self: Product => 
} 
//二元节点 
private[sql] trait BinaryNode extends SparkPlan with trees.BinaryNode[SparkPlan] { 
  self: Product => 
}

其各自真正的具体类为:

abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] { 
  self: Product => 
}

Spark-Sql源码解析之一 引言详解大数据

abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] { 
  self: Product => 
}

Spark-Sql源码解析之一 引言详解大数据

abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] { 
  self: Product => 
}

Spark-Sql源码解析之一 引言详解大数据

private[sql] trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] { 
  self: Product => 
}

Spark-Sql源码解析之一 引言详解大数据

private[sql] trait UnaryNode extends SparkPlan with trees.UnaryNode[SparkPlan] { 
  self: Product => 
}

Spark-Sql源码解析之一 引言详解大数据

private[sql] trait BinaryNode extends SparkPlan with trees.BinaryNode[SparkPlan] { 
  self: Product => 
}

Spark-Sql源码解析之一 引言详解大数据

可见Spark-Sql里面二叉树结构贯穿了整个解析过程。

 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/9313.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论