Spark-Sql源码解析之二 Sqlparser:sql –> unresolved logical plan详解大数据

前面章节讲解了Spark-SQL中的核心流程,接下来主要讲解如何将sql语句转化为UnResolved Logical Plan(包含UnresolvedRelation、 UnresolvedFunction、 UnresolvedAttribute)。

protected[sql] def parseSql(sql: String): LogicalPlan = { 
  val ret = ddlParser.parse(sql, false) 
  ret 
} 
private[sql] class DDLParser( 
    parseQuery: String => LogicalPlan) 
  extends AbstractSparkSQLParser with DataTypeParser with Logging { 
 
  def parse(input: String, exceptionOnError: Boolean): LogicalPlan = { 
    try { 
//先解析看看是不是DDL语句 
      parse(input) 
    } catch { 
      case ddlException: DDLException => throw ddlException 
      case _ if !exceptionOnError => parseQuery(input)//进一步解析其它类型的语句,其parseQuery为DDLParser的构造参数 
      case x: Throwable => throw x 
    } 
  } 
} 
protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_)) 
 
//其中fallback= getSQLDialect().parse(_) 
protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_)) 
 
private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParser { 
override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | show | others 
  private lazy val cache: Parser[LogicalPlan] = 
  CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ { 
    case isLazy ~ tableName ~ plan => 
      CacheTableCommand(tableName, plan.map(fallback), isLazy.isDefined) 
  } 
private lazy val uncache: Parser[LogicalPlan] = 
  ( UNCACHE ~ TABLE ~> ident ^^ { 
      case tableName => UncacheTableCommand(tableName) 
    } 
  | CLEAR ~ CACHE ^^^ ClearCacheCommand 
  ) 
private lazy val set: Parser[LogicalPlan] = 
  SET ~> restInput ^^ { 
    case input => SetCommandParser(input) 
  } 
 
private lazy val show: Parser[LogicalPlan] = 
  SHOW ~> TABLES ~ (IN ~> ident).? ^^ { 
    case _ ~ dbName => ShowTablesCommand(dbName) 
  } 
 
private lazy val others: Parser[LogicalPlan] = 
  wholeInput ^^ { 
    case input => fallback(input)//select语句利用fallback解析 
  } 
} 
//继续往下追踪getSQLDialect().parse(_)就是DefaultParserDialect.parse(_) 
private[spark] class DefaultParserDialect extends ParserDialect { 
  @transient 
  protected val sqlParser = new SqlParser 
 
  override def parse(sqlText: String): LogicalPlan = { 
    sqlParser.parse(sqlText) 
  } 
}
关键就是SqlParser
class SqlParser extends AbstractSparkSQLParser with DataTypeParser { 
protected lazy val select: Parser[LogicalPlan] = 
  SELECT ~> DISTINCT.? ~ 
    repsep(projection, ",") ~ 
    (FROM   ~> relations).? ~ 
    (WHERE  ~> expression).? ~ 
    (GROUP  ~  BY ~> rep1sep(expression, ",")).? ~ 
    (HAVING ~> expression).? ~ 
    sortType.? ~ 
    (LIMIT  ~> expression).? ^^ { 
      case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l =>//解析顺序为r,f,g,p,d,h,o,l 
        val base = r.getOrElse(OneRowRelation) 
        val withFilter = f.map(Filter(_, base)).getOrElse(base) 
        val withProjection = g 
          .map(Aggregate(_, assignAliases(p), withFilter)) 
          .getOrElse(Project(assignAliases(p), withFilter)) 
        val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection) 
        val withHaving = h.map(Filter(_, withDistinct)).getOrElse(withDistinct) 
        val withOrder = o.map(_(withHaving)).getOrElse(withHaving) 
        val withLimit = l.map(Limit(_, withOrder)).getOrElse(withOrder) 
        withLimit 
    } 
}

比方说:

Sql语句为:

SELECT id,dev_chnid,dev_chnname,car_num,car_speed,car_direct fromtest where id > 1 group by dev_chnid sort by car_num

unresolvedlogical plan为:

[‘Sort [‘car_num ASC], false//最后是o

 ‘Aggregate [‘dev_chnid],[‘id,’dev_chnid,’dev_chnname,’car_num,’car_speed,’car_direct]//接着是g

  ‘Filter (‘id > 1)//然后f

   ‘UnresolvedRelation [test],None//先解析r

]

可见其unresolvedlogical plan的语法树是根据select语句的解析顺序生成的。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9312.html

(0)
上一篇 2021年7月19日 09:24
下一篇 2021年7月19日 09:24

相关推荐

发表回复

登录后才能评论