这篇文章主要介绍spark 3.0.1集成delta 0.7.0之如何实现delta自定义sql,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
自定义的DeltaDataSource
我们在用delta的时候,得指定delta特定的格式,如下:
val data = spark.range(5, 10) data.write.format("delta").mode("overwrite").save("/tmp/delta-table") df.show()
那这个delta datasource是怎么集成到spark呢?我们来分析一下:
直接到DataStreamWriter,如下:
val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",") val useV1Source = disabledSources.contains(cls.getCanonicalName) || // file source v2 does not support streaming yet. classOf[FileDataSourceV2].isAssignableFrom(cls)
DataSource.lookupDataSource 方法是关键点。如下:
def lookupDataSource(provider: String, conf: SQLConf): Class[_] = { val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match { case name if name.equalsIgnoreCase("orc") && conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" => classOf[OrcDataSourceV2].getCanonicalName case name if name.equalsIgnoreCase("orc") && conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" => "org.apache.spark.sql.hive.orc.OrcFileFormat" case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled => "org.apache.spark.sql.avro.AvroFileFormat" case name => name } val provider2 = s"$provider1.DefaultSource" val loader = Utils.getContextOrSparkClassLoader val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
这里用到了ServiceLoader.load的方法,该是java的SPI,具体的细节可以网上查阅,我们说重点 直接找到ServiceLoader.LazyIterator部分
private class LazyIterator implements Iterator<S> { Class<S> service; ClassLoader loader; Enumeration<URL> configs = null; Iterator<String> pending = null; String nextName = null; private LazyIterator(Class<S> service, ClassLoader loader) { this.service = service; this.loader = loader; } private boolean hasNextService() { if (nextName != null) { return true; } if (configs == null) { try { String fullName = PREFIX + service.getName(); if (loader == null) configs = ClassLoader.getSystemResources(fullName); else configs = loader.getResources(fullName); } catch (IOException x) { fail(service, "Error locating configuration files", x); } }
其中的loader.getResources方法,就是查找classpath下的特定文件,如果有多个就会返回多个, 对于spark来说,查找的是class DataSourceRegister,也就是META-INF/services/org.apache.spark.sql.sources.DataSourceRegister文件,实际上spark内部的datasource的实现,通过通过这种方式加载进来的
我们查看一下delta的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister文件为org.apache.spark.sql.delta.sources.DeltaDataSource,注意DeltaDatasource是基于Datasource v1进行开发的, 至此我们就知道了delta datasource和spark结合的大前提的实现
分析
我们从delta的configurate sparksession入手,如下:
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("...") .master("...") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate()
我们可以看到 config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
从spark configuration,我们可以看到对该spark.sql.extensions的解释是
A comma-separated list of classes that implement Function1[SparkSessionExtensions, Unit] used to configure Spark Session extensions. The classes must have a no-args constructor. If multiple extensions are specified, they are applied in the specified order. For the case of rules and planner strategies, they are applied in the specified order. For the case of parsers, the last parser is used and each parser can delegate to its predecessor. For the case of function name conflicts, the last registered function name is used.
一句话就是用来对sparksession的扩展,可以对spark sql的逻辑计划进行扩展,且这个功能从spark 2.2.0就有了
看一下io.delta.sql.DeltaSparkSessionExtension类
class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) { override def apply(extensions: SparkSessionExtensions): Unit = { extensions.injectParser { (session, parser) => new DeltaSqlParser(parser) } extensions.injectResolutionRule { session => new DeltaAnalysis(session, session.sessionState.conf) } extensions.injectCheckRule { session => new DeltaUnsupportedOperationsCheck(session) } extensions.injectPostHocResolutionRule { session => new PreprocessTableUpdate(session.sessionState.conf) } extensions.injectPostHocResolutionRule { session => new PreprocessTableMerge(session.sessionState.conf) } extensions.injectPostHocResolutionRule { session => new PreprocessTableDelete(session.sessionState.conf) } } }
DeltaSqlParser class就是delta对于自身语法的支持,那到底怎么支持以及支持什么呢? 我们看一下extensions.injectParser代码
private[this] val parserBuilders = mutable.Buffer.empty[ParserBuilder] private[sql] def buildParser( session: SparkSession, initial: ParserInterface): ParserInterface = { parserBuilders.foldLeft(initial) { (parser, builder) => builder(session, parser) } } /** * Inject a custom parser into the [[SparkSession]]. Note that the builder is passed a session * and an initial parser. The latter allows for a user to create a partial parser and to delegate * to the underlying parser for completeness. If a user injects more parsers, then the parsers * are stacked on top of each other. */ def injectParser(builder: ParserBuilder): Unit = { parserBuilders += builder }
我们看到buildParser方法对我们传入的DeltaSqlParser进行了方法的初始化,也就是说DeltaSqlParser 的delegate变量被赋值为initial, 而该buildParser方法 被BaseSessionStateBuilder调用:
/** * Parser that extracts expressions, plans, table identifiers etc. from SQL texts. * * Note: this depends on the `conf` field. */ protected lazy val sqlParser: ParserInterface = { extensions.buildParser(session, new SparkSqlParser(conf)) }
所以说initial的实参是SparkSqlParser,也就是SparkSqlParser成了DeltaSqlParser代理,我们再看看DeltaSqlParser的方法:
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser => builder.visit(parser.singleStatement()) match { case plan: LogicalPlan => plan case _ => delegate.parsePlan(sqlText) } }
这里涉及到了antlr4的语法,也就是说对于逻辑计划的解析,如自身DeltaSqlParser能够解析,就进行解析,不能的话就委托给SparkSqlParser进行解析,而解析是该类DeltaSqlAstBuilder的功能:
class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { /** * Create a [[VacuumTableCommand]] logical plan. Example SQL: * {{{ * VACUUM ('/path/to/dir' | delta.`/path/to/dir`) [RETAIN number HOURS] [DRY RUN]; * }}} */ override def visitVacuumTable(ctx: VacuumTableContext): AnyRef = withOrigin(ctx) { VacuumTableCommand( Option(ctx.path).map(string), Option(ctx.table).map(visitTableIdentifier), Option(ctx.number).map(_.getText.toDouble), ctx.RUN != null) } override def visitDescribeDeltaDetail( ctx: DescribeDeltaDetailContext): LogicalPlan = withOrigin(ctx) { DescribeDeltaDetailCommand( Option(ctx.path).map(string), Option(ctx.table).map(visitTableIdentifier)) } override def visitDescribeDeltaHistory( ctx: DescribeDeltaHistoryContext): LogicalPlan = withOrigin(ctx) { DescribeDeltaHistoryCommand( Option(ctx.path).map(string), Option(ctx.table).map(visitTableIdentifier), Option(ctx.limit).map(_.getText.toInt)) } override def visitGenerate(ctx: GenerateContext): LogicalPlan = withOrigin(ctx) { DeltaGenerateCommand( modeName = ctx.modeName.getText, tableId = visitTableIdentifier(ctx.table)) } override def visitConvert(ctx: ConvertContext): LogicalPlan = withOrigin(ctx) { ConvertToDeltaCommand( visitTableIdentifier(ctx.table), Option(ctx.colTypeList).map(colTypeList => StructType(visitColTypeList(colTypeList))), None) } override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) { visit(ctx.statement).asInstanceOf[LogicalPlan] } protected def visitTableIdentifier(ctx: QualifiedNameContext): TableIdentifier = withOrigin(ctx) { ctx.identifier.asScala match { case Seq(tbl) => TableIdentifier(tbl.getText) case Seq(db, tbl) => TableIdentifier(tbl.getText, Some(db.getText)) case _ => throw new ParseException(s"Illegal table name ${ctx.getText}", ctx) } } override def visitPassThrough(ctx: PassThroughContext): LogicalPlan = null }
那这些方法比如visitVacuumTable,visitDescribeDeltaDetail是从哪里来的呢? 咱们看看DeltaSqlBase.g4:
singleStatement : statement EOF ; // If you add keywords here that should not be reserved, add them to 'nonReserved' list. statement : VACUUM (path=STRING | table=qualifiedName) (RETAIN number HOURS)? (DRY RUN)? #vacuumTable | (DESC | DESCRIBE) DETAIL (path=STRING | table=qualifiedName) #describeDeltaDetail | GENERATE modeName=identifier FOR TABLE table=qualifiedName #generate | (DESC | DESCRIBE) HISTORY (path=STRING | table=qualifiedName) (LIMIT limit=INTEGER_VALUE)? #describeDeltaHistory | CONVERT TO DELTA table=qualifiedName (PARTITIONED BY '(' colTypeList ')')? #convert | .*? #passThrough ;
这里涉及到的antlr4语法,不会的可以自行网上查阅。注意一下spark 和delta用到的都是visit的模式。
再来对于一下delta官网提供的操作 :
Vacuum Describe History Describe Detail Generate Convert to Delta Convert Delta table to a Parquet table
这样就能对应上了,如Vacuum操作对应vacuumTable,Convert to Delta对应 convert.
其实delta支持拓展了spark,我们也可按照delta的方式,对spark进行扩展,从而实现自己的sql语法
以上是“spark 3.0.1集成delta 0.7.0之如何实现delta自定义sql”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注亿速云行业资讯频道!
原创文章,作者:3628473679,如若转载,请注明出处:https://blog.ytso.com/223001.html