Spark-Sql源码解析之三 Analyzer:Unresolved logical plan –> analyzed logical plan详解大数据

Analyzer主要职责就是将通过Sql Parser未能Resolved的Logical Plan给Resolved掉。

lazy val analyzed: LogicalPlan = analyzer.execute(logical)//分析过的LogicalPlan 
protected[sql] lazy val analyzer: Analyzer = 
  new Analyzer(catalog, functionRegistry, conf) { 
    override val extendedResolutionRules = 
      ExtractPythonUdfs :: 
      sources.PreInsertCastAndRename :: 
      Nil 
    override val extendedCheckRules = Seq( 
      sources.PreWriteCheck(catalog) 
    ) 
  } 
class Analyzer( 
    catalog: Catalog, 
    registry: FunctionRegistry, 
    conf: CatalystConf, 
    maxIterations: Int = 100) 
  extends RuleExecutor[LogicalPlan] with HiveTypeCoercion with CheckAnalysis { 
  def resolver: Resolver = { 
    if (conf.caseSensitiveAnalysis) { 
      caseSensitiveResolution 
    } else { 
      caseInsensitiveResolution 
    } 
  } 
 
  val fixedPoint = FixedPoint(maxIterations) 
 
  /** 
   * Override to provide additional rules for the "Resolution" batch. 
   */ 
  val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil 
 
  lazy val batches: Seq[Batch] = Seq(//不同的Batch代表不同的策略 
    Batch("Substitution", fixedPoint, 
      CTESubstitution :: 
      WindowsSubstitution :: 
      Nil : _*), 
    Batch("Resolution", fixedPoint, 
      //通过catalog解析表名 
      ResolveRelations :: 
      //解析从子节点的操作生成的属性,一般是别名引起的,比如a.id 
      ResolveReferences :: 
      ResolveGroupingAnalytics :: 
      //在select语言里,order by的属性往往在前面没写,查询的时候也需要把这些字段查出来,排序完毕之后再删除 
      ResolveSortReferences :: 
      ResolveGenerate :: 
      //解析函数 
      ResolveFunctions :: 
      ExtractWindowExpressions :: 
      //解析全局的聚合函数,比如select sum(score) from table 
      GlobalAggregates :: 
      //解析having子句后面的聚合过滤条件,比如having sum(score) > 400 
      UnresolvedHavingClauseAttributes :: 
      //typeCoercionRules是hive的类型转换规则 
      TrimGroupingAliases :: 
      typeCoercionRules ++ 
      extendedResolutionRules : _*) 
  ) 
… 
}

其中val analyzed: LogicalPlan= analyzer.execute(logical),logical就是sqlparser解析出来的unresolved logical plan,analyzed就是analyzed logical plan。那么exectue究竟是这么样的过程呢?

def execute(plan: TreeType): TreeType = { 
  var curPlan = plan 
  batches.foreach { batch =>//针对每个Batch进行处理 
    val batchStartPlan = curPlan 
    var iteration = 1 
    var lastPlan = curPlan 
    var continue = true 
    // Run until fix point (or the max number of iterations as specified in the strategy. 
    while (continue) {//只要对这个plan应用这个batch里面的所有rule之后,最后生成的plan没有发生变化才认为所有都遍历过了,只要有变化,就继续遍历 
      //fold函数操作遍历问题集合的顺序。foldLeft是从左开始计算,然后往右遍历。foldRight是从右开始算,然后往左遍历。 
      curPlan = batch.rules.foldLeft(curPlan) { 
        case (plan, rule) => 
          val result = rule(plan)//对这个plan应用rule.apply转化里面的TreeNode 
          logInfo(s"plan (${plan}) /n result (${result}) /n rule (${rule})")//加这个打印可以看到每个plan应用之后的result是什么,方便后面讲解 
          if (!result.fastEquals(plan)) { 
            logTrace( 
              s""" 
                |=== Applying Rule ${rule.ruleName} === 
                |${sideBySide(plan.treeString, result.treeString).mkString("/n")} 
              """.stripMargin) 
          } 
          result 
      } 
      iteration += 1 
      if (iteration > batch.strategy.maxIterations) { 
        // Only log if this is a rule that is supposed to run more than once. 
        if (iteration != 2) { 
          logInfo(s"Max iterations (${iteration - 1}) reached for batch ${batch.name}") 
        } 
        continue = false 
      } 
      if (curPlan.fastEquals(lastPlan)) { 
        logTrace( 
          s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.") 
        continue = false 
      } 
      lastPlan = curPlan 
    } 
    if (!batchStartPlan.fastEquals(curPlan)) { 
      logDebug( 
        s""" 
        |=== Result of Batch ${batch.name} === 
        |${sideBySide(plan.treeString, curPlan.treeString).mkString("/n")} 
      """.stripMargin) 
    } else { 
      logTrace(s"Batch ${batch.name} has no effect.") 
    } 
  } 
  curPlan 
}

重点在于以下这个函数:

val result = rule(plan)//对这个plan应用rule.apply转化里面的TreeNode

rule(plan)调用的是对应的Rule[LogicalPlan]对象里面的apply函数,例如ResolveRelations和ResolveReferences

object ResolveRelations extends Rule[LogicalPlan] { 
  def getTable(u: UnresolvedRelation): LogicalPlan = { 
    try { 
      catalog.lookupRelation(u.tableIdentifier, u.alias) 
    } catch { 
      case _: NoSuchTableException => 
        u.failAnalysis(s"no such table ${u.tableName}") 
    } 
  } 
  //输入(plan)logical 返回logical,transform是遍历各个节点,对每个节点应用该rule 
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {//调用transformDown,本质上就是二叉树的前序(pre-order 
)遍历 
    case [email protected](u: UnresolvedRelation, _, _, _, _) => 
      i.copy(table = EliminateSubQueries(getTable(u))) 
    case u: UnresolvedRelation => 
      getTable(u) 
  } 
}
 
object ResolveReferences extends Rule[LogicalPlan] { 
  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {// transformUp本质上就是二叉树的后序(post-order 
)遍历 
    case p: LogicalPlan if !p.childrenResolved => p 
 
    // If the projection list contains Stars, expand it. 
    case p @ Project(projectList, child) if containsStar(projectList) => 
      Project( 
        projectList.flatMap { 
          case s: Star => s.expand(child.output, resolver) 
          case Alias(f @ UnresolvedFunction(_, args), name) if containsStar(args) => 
            val expandedArgs = args.flatMap { 
              case s: Star => s.expand(child.output, resolver) 
              case o => o :: Nil 
            } 
            Alias(child = f.copy(children = expandedArgs), name)() :: Nil 
          case Alias(c @ CreateArray(args), name) if containsStar(args) => 
            val expandedArgs = args.flatMap { 
              case s: Star => s.expand(child.output, resolver) 
              case o => o  

二叉树的遍历原理见下图:
Spark-Sql源码解析之三 Analyzer:Unresolved logical plan –> analyzed logical plan详解大数据

接下来讲解几个典型的Rule[LogicalPlan]

3.1 ResolveRelations

UnresolvedRelation解析为resolvedRelation
object ResolveRelations extends Rule[LogicalPlan] { 
  def getTable(u: UnresolvedRelation): LogicalPlan = { 
    try { 
      catalog.lookupRelation(u.tableIdentifier, u.alias) 
    } catch { 
      case _: NoSuchTableException => 
        u.failAnalysis(s"no such table ${u.tableName}") 
    } 
  } 
  //输入(plan)logical 返回logical,transform是遍历各个节点,对每个节点应用该rule 
  def apply(plan: LogicalPlan): LogicalPlan = plan transform { 
    case [email protected](u: UnresolvedRelation, _, _, _, _) => 
      i.copy(table = EliminateSubQueries(getTable(u))) 
    case u: UnresolvedRelation =>//当遇到UnresolvedRelation时,通过在catalog里查找表名对应的真实的数据源是什么relation 
      getTable(u) 
  } 
} 
而这个表名对应的relation是在dataFrame.registerTempTable(source)时候注册进去的。 
dataFrame.registerTempTable(source)

且看dataFrame.registerTempTable

/** 
 * Registers this [[DataFrame]] as a temporary table using the given name.  The lifetime of this 
 * temporary table is tied to the [[SQLContext]] that was used to create this DataFrame. 
 * 
 * @group basic 
 * @since 1.3.0 
 */ 
def registerTempTable(tableName: String): Unit = { 
  sqlContext.registerDataFrameAsTable(this, tableName) 
} 
/** 
 * Registers the given [[DataFrame]] as a temporary table in the catalog. Temporary tables exist 
 * only during the lifetime of this instance of SQLContext. 
 */ 
private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = { 
  catalog.registerTable(Seq(tableName), df.logicalPlan)//一个表名对应1个logicalPlan 
}

而这个logicalPlan正是dataFrame里面的logicalPlan

DataFrame dataFrame = sqlContext.parquetFile(hdfsPath)//这个dataFrame里面的logicalPlan 
def parquetFile(paths: String*): DataFrame = { 
  if (paths.isEmpty) { 
    emptyDataFrame 
  } else if (conf.parquetUseDataSourceApi) {//目前走这个分支 
    read.parquet(paths : _*) 
  } else { 
    DataFrame(this, parquet.ParquetRelation( 
      paths.mkString(","), Some(sparkContext.hadoopConfiguration), this)) 
  } 
} 
def parquet(paths: String*): DataFrame = { 
  if (paths.isEmpty) { 
    sqlContext.emptyDataFrame 
  } else { 
    val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray 
    sqlContext.baseRelationToDataFrame( 
      new ParquetRelation2( 
        globbedPaths.map(_.toString), None, None, Map.empty[String, String])(sqlContext))//最终形成的正是ParquetRelation2 
  } 
}

然后我们看下日志打印:

plan-> 
'Sort ['car_num ASC], false 
 'Aggregate ['dev_chnid], ['id,'dev_chnid,'dev_chnname,'car_num,'car_speed,'car_direct] 
  'Filter ('id > 1) 
   'UnresolvedRelation [test], None 
 
result-> 
'Sort ['car_num ASC], false 
 'Aggregate ['dev_chnid], ['id,'dev_chnid,'dev_chnname,'car_num,'car_speed,'car_direct] 
  'Filter ('id > 1) 
   Subquery test 
 Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] [email protected] 
 
rule->org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$@51db8cdb

当应用rule=ResolveRelations之后,将UnresolvedRelation [test], None解析成

Subquery test

Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42][email protected]

3.2 ResolveReferences

解析节点的输出属性,每个LogicalPlan的输出都是一些字段。例如当select*出现时,需要把*代表的所有字段列举出来

object ResolveReferences extends Rule[LogicalPlan] { 
  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { 
    case p: LogicalPlan if !p.childrenResolved => p 
 
    // If the projection list contains Stars, expand it. 
    case p @ Project(projectList, child) if containsStar(projectList) =>//如果出现*则需要把*扩展出来 
      Project( 
        projectList.flatMap { 
          case s: Star => s.expand(child.output, resolver) 
          case Alias(f @ UnresolvedFunction(_, args), name) if containsStar(args) => 
            val expandedArgs = args.flatMap { 
              case s: Star => s.expand(child.output, resolver) 
              case o => o :: Nil 
            } 
            Alias(child = f.copy(children = expandedArgs), name)() :: Nil 
          case Alias(c @ CreateArray(args), name) if containsStar(args) => 
            val expandedArgs = args.flatMap { 
              case s: Star => s.expand(child.output, resolver) 
              case o => o :: Nil 
            } 
            Alias(c.copy(children = expandedArgs), name)() :: Nil 
          case Alias(c @ CreateStruct(args), name) if containsStar(args) => 
            val expandedArgs = args.flatMap { 
              case s: Star => s.expand(child.output, resolver) 
              case o => o :: Nil 
            } 
            Alias(c.copy(children = expandedArgs), name)() :: Nil 
          case o => o :: Nil 
        }, 
        child) 
    case t: ScriptTransformation if containsStar(t.input) => 
      t.copy( 
        input = t.input.flatMap { 
          case s: Star => s.expand(t.child.output, resolver) 
          case o => o :: Nil 
        } 
      ) 
    …… 
}

例如sql语句如下:

String sql = "SELECT * from test ";

则日志打印如下:

plan-> 
'Project [*] 
 Subquery test 
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] [email protected] 
 
result-> 
Project [id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42]//将*解析成具体的列 
 Subquery test 
  Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] [email protected] 
rule->org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences[email protected]

3.3 ResolveSortReferences

在select语言里,order by的属性往往在前面没写,查询的时候也需要把这些字段查出来,排序完毕之后再删除,还有当同时存在聚合函数和排序的时候,如果排序的字段不在聚合函数的字段中,则也要把对应的字段添加到聚合函数中:

object ResolveSortReferences extends Rule[LogicalPlan] { 
  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { 
    case s @ Sort(ordering, global, p @ Project(projectList, child)) 
        if !s.resolved && p.resolved => 
      val (resolvedOrdering, missing) = resolveAndFindMissing(ordering, p, child) 
 
      // If this rule was not a no-op, return the transformed plan, otherwise return the original. 
      if (missing.nonEmpty) { 
        // Add missing attributes and then project them away after the sort. 
        Project(p.output, 
          Sort(resolvedOrdering, global, 
            Project(projectList ++ missing, child)))//把order中没有出现在p的输出列表的字段补充进p 
      } else { 
        logDebug(s"Failed to find $missing in ${p.output.mkString(", ")}") 
        s // Nothing we can do here. Return original plan. 
      } 
    case s @ Sort(ordering, global, a @ Aggregate(grouping, aggs, child)) 
        if !s.resolved && a.resolved => 
      val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name }) 
      // A small hack to create an object that will allow us to resolve any references that 
      // refer to named expressions that are present in the grouping expressions. 
      val groupingRelation = LocalRelation( 
        grouping.collect { case ne: NamedExpression => ne.toAttribute } 
      ) 
 
      val (resolvedOrdering, missing) = resolveAndFindMissing(ordering, a, groupingRelation) 
 
      if (missing.nonEmpty) { 
        // Add missing grouping exprs and then project them away after the sort. 
        Project(a.output, 
          Sort(resolvedOrdering, global, 
            Aggregate(grouping, aggs ++ missing, child)))//把order中没有出现在聚合函数中的字段放到聚合函数中 
      } else { 
        s // Nothing we can do here. Return original plan. 
      } 
  }

例如sql语句如下:

String sql = "SELECT dev_chnid from test order by id";

则日志打印如下:

plan-> 
'Sort ['id ASC], true//id没有出现在Project中 
 Project [dev_chnid#26] 
  Subquery test 
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] [email protected] 
result-> 
Project [dev_chnid#26] 
 Sort [id#0L ASC], true 
  Project [dev_chnid#26,id#0L]//先统一一起查出来 
   Subquery test 
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] [email protected] 
rule->org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveSortRefere[email protected]

3.4 ResolveFunctions

解析UDF(user definedfunction)用户自定义函数。Spark支持用户自定义函数,用户可以在Spark SQL 里自定义实际需要的UDF来处理数据。相信在使用Sparksql的人都遇到了Sparksql所支持的函数太少了的难处,除了最基本的函数,Sparksql所能支撑的函数很少,肯定不能满足正常的项目使用,UDF可以解决问题

那么如何使用用户自定义函数呢,先看段代码:

SQLContext sqlContext = new SQLContext(jsc); 
UDFRegistration udfRegistration = new UDFRegistration(sqlContext);//通过UDFRegistration进行注册 
DataFrame dataFrame = sqlContext.parquetFile(hdfsPath); 
dataFrame.registerTempTable(source); 
udfRegistration.register("strlength", new UDF1<String, Integer>() { 
    @Override 
    public Integer call(String str) throws Exception { 
        return (Integer)str.length(); 
    } 
}, DataType.fromCaseClassString("IntegerType"));//返回对应字符串的长度 
String sql = "SELECT strlength(dev_chnid) from test"; 
DataFrame result = sqlContext.sql(sql);
用户可以通过UDFRegistration针对某个字段类型进行注册自定义函数,那么ResolveFunctions是如何解析的?接着往下看:
object ResolveFunctions extends Rule[LogicalPlan] { 
  def apply(plan: LogicalPlan): LogicalPlan = plan transform { 
    case q: LogicalPlan => 
      q transformExpressions { 
        case u @ UnresolvedFunction(name, children) if u.childrenResolved => 
          registry.lookupFunction(name, children)//通过registry查找 
      } 
  } 
} 
protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry(conf) 
class SimpleFunctionRegistry(val conf: CatalystConf) extends FunctionRegistry { 
  val functionBuilders = StringKeyHashMap[FunctionBuilder](conf.caseSensitiveAnalysis) 
  override def registerFunction(name: String, builder: FunctionBuilder): Unit = { 
    functionBuilders.put(name, builder) 
  } 
  override def lookupFunction(name: String, children: Seq[Expression]): Expression = { 
    functionBuilders(name)(children) 
  } 
} 
class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { 
/** 
 * Register a user-defined function with 1 arguments. 
 * @since 1.3.0 
 */ 
def register(name: String, f: UDF1[_, _], returnType: DataType) = {//内部最终还是通过functionRegistry进行注册的 
  functionRegistry.registerFunction( 
    name, 
    (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF1[Any, Any]].call(_: Any), returnType, e)) 
} 
}

则日志打印如下:

plan-> 
'Project ['strlength(dev_chnid#26) AS c0#43] 
 Subquery test 
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] [email protected] 
result-> 
Project [scalaUDF(dev_chnid#26) AS c0#43]//将strlength解析成scalaUDF 
 Subquery test 
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] [email protected] 
rule->org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$@2b8199b7

3.5 GlobalAggregates

解析select 中的全局聚合函数,例如select MAX(ID)。

object GlobalAggregates extends Rule[LogicalPlan] { 
  def apply(plan: LogicalPlan): LogicalPlan = plan transform { 
    case Project(projectList, child) if containsAggregates(projectList) =>//如果包含聚合表达式,则将Project转变为Aggregate 
      Aggregate(Nil, projectList, child) 
  } 
 
  def containsAggregates(exprs: Seq[Expression]): Boolean = { 
    exprs.foreach(_.foreach { 
      case agg: AggregateExpression => return true 
      case _ => 
    }) 
    false 
  } 
}

例如sql语句如下:

String sql = "SELECT MAX(id) from test";

则日志打印如下:

16-07-19 14:17:59,708 INFO org.apache.spark.sql.SQLContext$$anon$1(Logging.scala:59) ## 
plan-> 
'Project [MAX(id#0L) AS c0#43L] 
 Subquery test 
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42][email protected] 
  
result-> 
Aggregate [MAX(id#0L) AS c0#43L]//将Project解析成Aggragate 
 Subquery test 
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42][email protected] 
  
rule->org.apache.spark.sql.catalyst.analysis.Analyzer$GlobalAggregates$@4a9e419a

3.6 UnresolvedHavingClauseAttributes

解析having子句后面的过滤条件,如果该过滤字段没有出现在select 之后的话,则补齐。

object UnresolvedHavingClauseAttributes extends Rule[LogicalPlan] { 
  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { 
    case filter @ Filter(havingCondition, aggregate @ Aggregate(_, originalAggExprs, _)) 
        if aggregate.resolved && containsAggregate(havingCondition) => { 
      val evaluatedCondition = Alias(havingCondition, "havingCondition")() 
      val aggExprsWithHaving = evaluatedCondition +: originalAggExprs//合并filter中的过滤字段 
      Project(aggregate.output, 
        Filter(evaluatedCondition.toAttribute, 
          aggregate.copy(aggregateExpressions = aggExprsWithHaving)))//将其作为聚合函数的输出 
    } 
  } 
  protected def containsAggregate(condition: Expression): Boolean = 
    condition 
      .collect { case ae: AggregateExpression => ae } 
      .nonEmpty 
}

例如sql语句如下:

String sql = "SELECT SUM(car_speed) from test group by dev_chnname HAVING SUM(id) > 1";//id没有出现在select 之后

则日志打印如下:

16-07-19 15:41:43,410 INFO  org.apache.spark.sql.SQLContext$$anon$1(Logging.scala:59) ## 
plan-> 
'Filter (SUM('id) > 1) 
 Aggregate [dev_chnname#4], [SUM(car_speed#8) AS c0#43] 
  Subquery test 
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] [email protected] 
 
result-> 
'Project [c0#43] 
 'Filter 'havingCondition 
  'Aggregate [dev_chnname#4], [(SUM('id) > 1) AS havingCondition#44,SUM(car_speed#8) AS c0#43]//将SUM(id)下推到聚合函数这里 
   Subquery test 
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] [email protected] 
 
rule->org.apache.spark.sql.catalyst.analysis.Analyzer$UnresolvedHavingC[email protected]

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9311.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论