Spark SQL, DataFrames 以及 Datasets 编程指南
概要
Spark SQL是Spark中处理结构化数据的模块。与基础的Spark RDD API不同,Spark SQL的接口提供了更多关于数据的结构信息和计算任务的运行时信息。在Spark内部,Spark SQL会能够用于做优化的信息比RDD API更多一些。Spark SQL如今有了三种不同的API:SQL语句、DataFrame API和最新的Dataset API。不过真正运行计算的时候,无论你使用哪种API或语言,Spark SQL使用的执行引擎都是同一个。这种底层的统一,使开发者可以在不同的API之间来回切换,你可以选择一种最自然的方式,来表达你的需求。
本文中所有的示例都使用Spark发布版本中自带的示例数据,并且可以在spark-shell、pyspark shell以及sparkR shell中运行。
SQL
Spark SQL的一种用法是直接执行SQL查询语句,你可使用最基本的SQL语法,也可以选择HiveQL语法。Spark SQL可以从已有的Hive中读取数据。更详细的请参考Hive Tables 这一节。如果用其他编程语言运行SQL,Spark SQL将以DataFrame返回结果。你还可以通过命令行command-line 或者 JDBC/ODBC 使用Spark SQL。
DataFrames
Datasets
Dataset是Spark-1.6新增的一种API,目前还是实验性的。Dataset想要把RDD的优势(强类型,可以使用lambda表达式函数)和Spark SQL的优化执行引擎的优势结合到一起。Dataset可以由JVM对象构建(constructed )得到,而后Dataset上可以使用各种transformation算子(map,flatMap,filter 等)。
Dataset API 对 Scala 和 Java的支持接口是一致的,但目前还不支持Python,不过Python自身就有语言动态特性优势(例如,你可以使用字段名来访问数据,row.columnName)。对Python的完整支持在未来的版本会增加进来。
入门
入口:SQLContext
Spark SQL所有的功能入口都是SQLContext
类,及其子类。不过要创建一个SQLContext对象,首先需要有一个SparkContext对象。
val sc: SparkContext // 假设已经有一个 SparkContext 对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 用于包含RDD到DataFrame隐式转换操作
import sqlContext.implicits._
除了SQLContext之外,你也可以创建HiveContext,HiveContext是SQLContext 的超集。
除了SQLContext的功能之外,HiveContext还提供了完整的HiveQL语法,UDF使用,以及对Hive表中数据的访问。要使用HiveContext,你并不需要安装Hive,而且SQLContext能用的数据源,HiveContext也一样能用。HiveContext是单独打包的,从而避免了在默认的Spark发布版本中包含所有的Hive依赖。如果这些依赖对你来说不是问题(不会造成依赖冲突等),建议你在Spark-1.3之前使用HiveContext。而后续的Spark版本,将会逐渐把SQLContext升级到和HiveContext功能差不多的状态。
spark.sql.dialect选项可以指定不同的SQL变种(或者叫SQL方言)。这个参数可以在SparkContext.setConf里指定,也可以通过 SQL语句的SET key=value命令指定。对于SQLContext,该配置目前唯一的可选值就是”sql”,这个变种使用一个Spark SQL自带的简易SQL解析器。而对于HiveContext,spark.sql.dialect 默认值为”hiveql”,当然你也可以将其值设回”sql”。仅就目前而言,HiveSQL解析器支持更加完整的SQL语法,所以大部分情况下,推荐使用HiveContext。
创建DataFrame
Spark应用可以用SparkContext创建DataFrame,所需的数据来源可以是已有的RDD(existing RDD
),或者Hive表,或者其他数据源(data sources.)
以下是一个从JSON文件创建DataFrame的小栗子:
val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("examples/src/main/resources/people.json")
// 将DataFrame内容打印到stdout
df.show()
DataFrame操作
DataFrame提供了结构化数据的领域专用语言支持,包括Scala, Java, Python and R.
这里我们给出一个结构化数据处理的基本示例:
val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 创建一个 DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")
// 展示 DataFrame 的内容
df.show()
// age name
// null Michael
// 30 Andy
// 19 Justin
// 打印数据树形结构
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// select "name" 字段
df.select("name").show()
// name
// Michael
// Andy
// Justin
// 展示所有人,但所有人的 age 都加1
df.select(df("name"), df("age") + 1).show()
// name (age + 1)
// Michael null
// Andy 31
// Justin 20
// 筛选出年龄大于21的人
df.filter(df("age") > 21).show()
// age name
// 30 Andy
// 计算各个年龄的人数
df.groupBy("age").count().show()
// age count
// null 1
// 19 1
// 30 1
DataFrame的完整API列表请参考这里:API Documentation
除了简单的字段引用和表达式支持之外,DataFrame还提供了丰富的工具函数库,包括字符串组装,日期处理,常见的数学函数等。完整列表见这里:DataFrame Function Reference.
编程方式执行SQL查询
创建Dataset
Dataset API和RDD类似,不过Dataset不使用Java序列化或者Kryo,而是使用专用的编码器(Encoder )来序列化对象和跨网络传输通信。如果这个编码器和标准序列化都能把对象转字节,那么编码器就可以根据代码动态生成,并使用一种特殊数据格式,这种格式下的对象不需要反序列化回来,就能允许Spark进行操作,如过滤、排序、哈希等。
// 对普通类型数据的Encoder是由 importing sqlContext.implicits._ 自动提供的
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // 返回: Array(2, 3, 4)
// 以下这行不仅定义了case class,同时也自动为其创建了Encoder
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()
// DataFrame 只需提供一个和数据schema对应的class即可转换为 Dataset。Spark会根据字段名进行映射。
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path).as[Person]
和RDD互操作
Spark SQL有两种方法将RDD转为DataFrame。
1. 使用反射机制,推导包含指定类型对象RDD的schema。这种基于反射机制的方法使代码更简洁,而且如果你事先知道数据schema,推荐使用这种方式;
2. 编程方式构建一个schema,然后应用到指定RDD上。这种方式更啰嗦,但如果你事先不知道数据有哪些字段,或者数据schema是运行时读取进来的,那么你很可能需要用这种方式。
利用反射推导schema
Spark SQL的Scala接口支持自动将包含case class对象的RDD转为DataFrame。对应的case class定义了表的schema。case class的参数名通过反射,映射为表的字段名。case class还可以嵌套一些复杂类型,如Seq和Array。RDD隐式转换成DataFrame后,可以进一步注册成表。随后,你就可以对表中数据使用SQL语句查询了。
// sc 是已有的 SparkContext 对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 为了支持RDD到DataFrame的隐式转换
import sqlContext.implicits._
// 定义一个case class.
// 注意:Scala 2.10的case class最多支持22个字段,要绕过这一限制,
// 你可以使用自定义class,并实现Product接口。当然,你也可以改用编程方式定义schema
case class Person(name: String, age: Int)
// 创建一个包含Person对象的RDD,并将其注册成table
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
// sqlContext.sql方法可以直接执行SQL语句
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
// SQL查询的返回结果是一个DataFrame,且能够支持所有常见的RDD算子
// 查询结果中每行的字段可以按字段索引访问:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
// 或者按字段名访问:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
// row.getValuesMap[T] 会一次性返回多列,并以Map[String, T]为返回结果类型
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// 返回结果: Map("name" -> "Justin", "age" -> 19)
编程方式定义Schema
如果不能事先通过case class定义schema(例如,记录的字段结构是保存在一个字符串,或者其他文本数据集中,需要先解析,又或者字段对不同用户有所不同),那么你可能需要按以下三个步骤,以编程方式的创建一个DataFrame:
For example:
例如:
// sc 是已有的SparkContext对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 创建一个RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// 数据的schema被编码与一个字符串中
val schemaString = "name age"
// Import Row.
import org.apache.spark.sql.Row;
// Import Spark SQL 各个数据类型
import org.apache.spark.sql.types.{StructType,StructField,StringType};
// 基于前面的字符串生成schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// 将RDD[people]的各个记录转换为Rows,即:得到一个包含Row对象的RDD
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// 将schema应用到包含Row对象的RDD上,得到一个DataFrame
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
// 将DataFrame注册为table
peopleDataFrame.registerTempTable("people")
// 执行SQL语句
val results = sqlContext.sql("SELECT name FROM people")
// SQL查询的结果是DataFrame,且能够支持所有常见的RDD算子
// 并且其字段可以以索引访问,也可以用字段名访问
results.map(t => "Name: " + t(0)).collect().foreach(println)
数据源
Spark SQL支持基于DataFrame操作一系列不同的数据源。DataFrame既可以当成一个普通RDD来操作,也可以将其注册成一个临时表来查询。把DataFrame注册为table之后,你就可以基于这个table执行SQL语句了。本节将描述加载和保存数据的一些通用方法,包含了不同的Spark数据源,然后深入介绍一下内建数据源可用选项。
通用加载/保存函数
手动指定选项
你也可以手动指定数据源,并设置一些额外的选项参数。数据源可由其全名指定(如,org.apache.spark.sql.parquet),而对于内建支持的数据源,可以使用简写名(json, parquet, jdbc)。任意类型数据源创建的DataFrame都可以用下面这种语法转成其他类型数据格式。
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
直接对文件使用SQL
保存模式
Save操作有一个可选参数SaveMode,用这个参数可以指定如何处理数据已经存在的情况。很重要的一点是,这些保存模式都没有加锁,所以其操作也不是原子性的。另外,如果使用Overwrite模式,实际操作是,先删除数据,再写新数据。
仅Scala/Java | 所有支持的语言 | 含义 |
---|---|---|
SaveMode.ErrorIfExists (default) |
"error" (default) |
(默认模式)从DataFrame向数据源保存数据时,如果数据已经存在,则抛异常。 |
SaveMode.Append |
"append" |
如果数据或表已经存在,则将DataFrame的数据追加到已有数据的尾部。 |
SaveMode.Overwrite |
"overwrite" |
如果数据或表已经存在,则用DataFrame数据覆盖之。 |
SaveMode.Ignore |
"ignore" |
如果数据已经存在,那就放弃保存DataFrame数据。这和SQL里CREATE TABLE IF NOT EXISTS有点类似。 |
保存到持久化表
在使用HiveContext的时候,DataFrame可以用saveAsTable方法,将数据保存成持久化的表。与registerTempTable不同,saveAsTable会将DataFrame的实际数据内容保存下来,并且在HiveMetastore中创建一个游标指针。持久化的表会一直保留,即使Spark程序重启也没有影响,只要你连接到同一个metastore就可以读取其数据。读取持久化表时,只需要用用表名作为参数,调用SQLContext.table方法即可得到对应DataFrame。
默认情况下,saveAsTable会创建一个”managed table“,也就是说这个表数据的位置是由metastore控制的。同样,如果删除表,其数据也会同步删除。
Parquet文件
Parquet 是一种流行的列式存储格式。Spark SQL提供对Parquet文件的读写支持,而且Parquet文件能够自动保存原始数据的schema。写Parquet文件的时候,所有的字段都会自动转成nullable,以便向后兼容。
编程方式加载数据
仍然使用上面例子中的数据:
// 我们继续沿用之前例子中的sqlContext对象
// 为了支持RDD隐式转成DataFrame
import sqlContext.implicits._
val people: RDD[Person] = ... // 和上面例子中相同,一个包含case class对象的RDD
// 该RDD将隐式转成DataFrame,然后保存为parquet文件
people.write.parquet("people.parquet")
// 读取上面保存的Parquet文件(多个文件 - Parquet保存完其实是很多个文件)。Parquet文件是自描述的,文件中保存了schema信息
// 加载Parquet文件,并返回DataFrame结果
val parquetFile = sqlContext.read.parquet("people.parquet")
// Parquet文件(多个)可以注册为临时表,然后在SQL语句中直接查询
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
分区发现
像Hive这样的系统,一个很常用的优化手段就是表分区。在一个支持分区的表中,数据是保存在不同的目录中的,并且将分区键以编码方式保存在各个分区目录路径中。Parquet数据源现在也支持自动发现和推导分区信息。例如,我们可以把之前用的人口数据存到一个分区表中,其目录结构如下所示,其中有2个额外的字段,gender和country,作为分区键:
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
在这个例子中,如果需要读取Parquet文件数据,我们只需要把 path/to/table 作为参数传给 SQLContext.read.parquet 或者 SQLContext.read.load。Spark SQL能够自动的从路径中提取出分区信息,随后返回的DataFrame的schema如下:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
注意,分区键的数据类型将是自动推导出来的。目前,只支持数值类型和字符串类型数据作为分区键。
有的用户可能不想要自动推导出来的分区键数据类型。这种情况下,你可以通过 spark.sql.sources.partitionColumnTypeInference.enabled (默认是true)来禁用分区键类型推导。禁用之后,分区键总是被当成字符串类型。
从Spark-1.6.0开始,分区发现默认只在指定目录的子目录中进行。以上面的例子来说,如果用户把 path/to/table/gender=male 作为参数传给 SQLContext.read.parquet 或者 SQLContext.read.load,那么gender就不会被作为分区键。如果用户想要指定分区发现的基础目录,可以通过basePath选项指定。例如,如果把 path/to/table/gender=male作为数据目录,并且将basePath设为 path/to/table,那么gender仍然会最为分区键。
Schema合并
像ProtoBuffer、Avro和Thrift一样,Parquet也支持schema演变。用户从一个简单的schema开始,逐渐增加所需的新字段。这样的话,用户最终会得到多个schema不同但互相兼容的Parquet文件。目前,Parquet数据源已经支持自动检测这种情况,并合并所有文件的schema。
因为schema合并相对代价比较大,并且在多数情况下不是必要的,所以从Spark-1.5.0之后,默认是被禁用的。你可以这样启用这一功能:
// 继续沿用之前的sqlContext对象
// 为了支持RDD隐式转换为DataFrame
import sqlContext.implicits._
// 创建一个简单的DataFrame,存到一个分区目录中
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")
// 创建另一个DataFrame放到新的分区目录中,
// 并增加一个新字段,丢弃一个老字段
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")
// 读取分区表
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()
// 最终的schema将由3个字段组成(single,double,triple)
// 并且分区键出现在目录路径中
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)
Hive metastore Parquet table转换
在读写Hive metastore Parquet 表时,Spark SQL用的是内部的Parquet支持库,而不是Hive SerDe,因为这样性能更好。这一行为是由spark.sql.hive.convertMetastoreParquet 配置项来控制的,而且默认是启用的。
Hive/Parquet schema调和
Hive和Parquet在表结构处理上主要有2个不同点:
由于以上原因,我们必须在Hive metastore Parquet table转Spark SQL Parquet table的时候,对Hive metastore schema做调整,调整规则如下:
刷新元数据
配置
Parquet配置可以通过 SQLContext.setConf 或者 SQL语句中 SET key=value来指定。
属性名 | 默认值 | 含义 |
---|---|---|
spark.sql.parquet.binaryAsString |
false | 有些老系统,如:特定版本的Impala,Hive,或者老版本的Spark SQL,不区分二进制数据和字符串类型数据。这个标志的意思是,让Spark SQL把二进制数据当字符串处理,以兼容老系统。 |
spark.sql.parquet.int96AsTimestamp |
true | 有些老系统,如:特定版本的Impala,Hive,把时间戳存成INT96。这个配置的作用是,让Spark SQL把这些INT96解释为timestamp,以兼容老系统。 |
spark.sql.parquet.cacheMetadata |
true | 缓存Parquet schema元数据。可以提升查询静态数据的速度。 |
spark.sql.parquet.compression.codec |
gzip | 设置Parquet文件的压缩编码格式。可接受的值有:uncompressed, snappy, gzip(默认), lzo |
spark.sql.parquet.filterPushdown |
true | 启用过滤器下推优化,可以讲过滤条件尽量推导最下层,已取得性能提升 |
spark.sql.hive.convertMetastoreParquet |
true | 如果禁用,Spark SQL将使用Hive SerDe,而不是内建的对Parquet tables的支持 |
spark.sql.parquet.output.committer.class |
org.apache.parquet.hadoop. |
Parquet使用的数据输出类。这个类必须是 org.apache.hadoop.mapreduce.OutputCommitter的子类。一般来说,它也应该是 org.apache.parquet.hadoop.ParquetOutputCommitter的子类。注意:1. 如果启用spark.speculation, 这个选项将被自动忽略
2. 这个选项必须用hadoop configuration设置,而不是Spark SQLConf 3. 这个选项会覆盖 spark.sql.sources.outputCommitterClass Spark SQL有一个内建的org.apache.spark.sql.parquet.DirectParquetOutputCommitter, 这个类的在输出到S3的时候比默认的ParquetOutputCommitter类效率高。 |
spark.sql.parquet.mergeSchema | false | 如果设为true,那么Parquet数据源将会merge 所有数据文件的schema,否则,schema是从summary file获取的(如果summary file没有设置,则随机选一个) |
JSON数据集
Spark SQL在加载JSON数据的时候,可以自动推导其schema并返回DataFrame。用SQLContext.read.json读取一个包含String的RDD或者JSON文件,即可实现这一转换。
注意,通常所说的json文件只是包含一些json数据的文件,而不是我们所需要的JSON格式文件。JSON格式文件必须每一行是一个独立、完整的的JSON对象。因此,一个常规的多行json文件经常会加载失败。
// sc是已有的SparkContext对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 数据集是由路径指定的
// 路径既可以是单个文件,也可以还是存储文本文件的目录
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)
// 推导出来的schema,可由printSchema打印出来
people.printSchema()
// root
// |-- age: integer (nullable = true)
// |-- name: string (nullable = true)
// 将DataFrame注册为table
people.registerTempTable("people")
// 跑SQL语句吧!
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// 另一种方法是,用一个包含JSON字符串的RDD来创建DataFrame
val anotherPeopleRDD = sc.parallelize(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
Hive表
Spark SQL支持从Apache Hive读写数据。然而,Hive依赖项太多,所以没有把Hive包含在默认的Spark发布包里。要支持Hive,需要在编译spark的时候增加-Phive和-Phive-thriftserver标志。这样编译打包的时候将会把Hive也包含进来。注意,hive的jar包也必须出现在所有的worker节点上,访问Hive数据时候会用到(如:使用hive的序列化和反序列化SerDes时)。
Hive配置在conf/目录下hive-site.xml,core-site.xml(安全配置),hdfs-site.xml(HDFS配置)文件中。请注意,如果在YARN cluster(yarn-cluster mode)模式下执行一个查询的话,lib_mananged/jar/下面的datanucleus 的jar包,和conf/下的hive-site.xml必须在驱动器(driver)和所有执行器(executor)都可用。一种简便的方法是,通过spark-submit命令的–jars和–file选项来提交这些文件。
如果使用Hive,则必须构建一个HiveContext,HiveContext是派生于SQLContext的,添加了在Hive Metastore里查询表的支持,以及对HiveQL的支持。用户没有现有的Hive部署,也可以创建一个HiveContext。如果没有在hive-site.xml里配置,那么HiveContext将会自动在当前目录下创建一个metastore_db目录,再根据HiveConf设置创建一个warehouse目录(默认/user/hive/warehourse)。所以请注意,你必须把/user/hive/warehouse的写权限赋予启动spark应用程序的用户。
// sc是一个已有的SparkContext对象
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// 这里用的是HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
和不同版本的Hive Metastore交互
Spark SQL对Hive最重要的支持之一就是和Hive metastore进行交互,这使得Spark SQL可以访问Hive表的元数据。从Spark-1.4.0开始,Spark SQL有专门单独的二进制build版本,可以用来访问不同版本的Hive metastore,其配置表如下。注意,不管所访问的hive是什么版本,Spark SQL内部都是以Hive 1.2.1编译的,而且内部使用的Hive类也是基于这个版本(serdes,UDFs,UDAFs等)
以下选项可用来配置Hive版本以便访问其元数据:
属性名 | 默认值 | 含义 |
---|---|---|
spark.sql.hive.metastore.version |
1.2.1 |
Hive metastore版本,可选的值为0.12.0 到 1.2.1 |
spark.sql.hive.metastore.jars |
builtin |
初始化HiveMetastoreClient的jar包。这个属性可以是以下三者之一:
目前内建为使用Hive-1.2.1,编译的时候启用-Phive,则会和spark一起打包。如果没有-Phive,那么spark.sql.hive.metastore.version要么是1.2.1,要就是未定义 使用maven仓库下载的jar包版本。这个选项建议不要再生产环境中使用 |
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc, |
一个逗号分隔的类名前缀列表,这些类使用classloader加载,且可以在Spark SQL和特定版本的Hive间共享。例如,用来访问hive metastore 的JDBC的driver就需要这种共享。其他需要共享的类,是与某些已经共享的类有交互的类。例如,自定义的log4j appender |
spark.sql.hive.metastore.barrierPrefixes |
(empty) |
一个逗号分隔的类名前缀列表,这些类在每个Spark SQL所访问的Hive版本中都会被显式的reload。例如,某些在共享前缀列表(spark.sql.hive.metastore.sharedPrefixes)中声明为共享的Hive UD函数 |
用JDBC连接其他数据库
Spark SQL也可以用JDBC访问其他数据库。这一功能应该优先于使用JdbcRDD。因为它返回一个DataFrame,而DataFrame在Spark SQL中操作更简单,且更容易和来自其他数据源的数据进行交互关联。JDBC数据源在java和python中用起来也很简单,不需要用户提供额外的ClassTag。(注意,这与Spark SQL JDBC server不同,Spark SQL JDBC server允许其他应用执行Spark SQL查询)
首先,你需要在spark classpath中包含对应数据库的JDBC driver,下面这行包括了用于访问postgres的数据库driver
SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
远程数据库的表可以通过Data Sources API,用DataFrame或者SparkSQL 临时表来装载。以下是选项列表:
属性名 | 含义 |
---|---|
url |
需要连接的JDBC URL |
dbtable |
需要读取的JDBC表。注意,任何可以填在SQL的where子句中的东西,都可以填在这里。(既可以填完整的表名,也可填括号括起来的子查询语句) |
driver |
JDBC driver的类名。这个类必须在master和worker节点上都可用,这样各个节点才能将driver注册到JDBC的子系统中。 |
partitionColumn, lowerBound, upperBound, numPartitions |
这几个选项,如果指定其中一个,则必须全部指定。他们描述了多个worker如何并行的读入数据,并将表分区。partitionColumn必须是所查询的表中的一个数值字段。注意,lowerBound和upperBound只是用于决定分区跨度的,而不是过滤表中的行。因此,表中所有的行都会被分区然后返回。 |
fetchSize |
JDBC fetch size,决定每次获取多少行数据。在JDBC驱动上设成较小的值有利于性能优化(如,Oracle上设为10) |
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:postgresql:dbserver",
"dbtable" -> "schema.tablename")).load()
疑难解答
- JDBC driver class必须在所有client session或者executor上,对java的原生classloader可见。这是因为Java的DriverManager在打开一个连接之前,会做安全检查,并忽略所有对原声classloader不可见的driver。最简单的一种方法,就是在所有worker节点上修改compute_classpath.sh,并包含你所需的driver jar包。
- 一些数据库,如H2,会把所有的名字转大写。对于这些数据库,在Spark SQL中必须也使用大写。
性能调整
对于有一定计算量的Spark作业来说,可能的性能改进的方式,不是把数据缓存在内存里,就是调整一些开销较大的选项参数。
内存缓存
Spark SQL可以通过调用SQLContext.cacheTable(“tableName”)或者DataFrame.cache()把tables以列存储格式缓存到内存中。随后,Spark SQL将会扫描必要的列,并自动调整压缩比例,以减少内存占用和GC压力。你也可以用SQLContext.uncacheTable(“tableName”)来删除内存中的table。
你还可以使用SQLContext.setConf 或在SQL语句中运行SET key=value命令,来配置内存中的缓存。
属性名 | 默认值 | 含义 |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed |
true | 如果设置为true,Spark SQL将会根据数据统计信息,自动为每一列选择单独的压缩编码方式。 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | 控制列式缓存批量的大小。增大批量大小可以提高内存利用率和压缩率,但同时也会带来OOM(Out Of Memory)的风险。 |
其他配置选项
以下选项同样也可以用来给查询任务调性能。不过这些选项在未来可能被放弃,因为spark将支持越来越多的自动优化。
属性名 | 默认值 | 含义 |
---|---|---|
spark.sql.autoBroadcastJoinThreshold |
10485760 (10 MB) | 配置join操作时,能够作为广播变量的最大table的大小。设置为-1,表示禁用广播。注意,目前的元数据统计仅支持Hive metastore中的表,并且需要运行这个命令:ANALYSE TABLE <tableName> COMPUTE STATISTICS noscan |
spark.sql.tungsten.enabled |
true | 设为true,则启用优化的Tungsten物理执行后端。Tungsten会显式的管理内存,并动态生成表达式求值的字节码 |
spark.sql.shuffle.partitions |
200 | 配置数据混洗(shuffle)时(join或者聚合操作),使用的分区数。 |
分布式SQL引擎
Spark SQL可以作为JDBC/ODBC或者命令行工具的分布式查询引擎。在这种模式下,终端用户或应用程序,无需写任何代码,就可以直接在Spark SQL中运行SQL查询。
运行Thrift JDBC/ODBC server
这里实现的Thrift JDBC/ODBC server和Hive-1.2.1中的HiveServer2
是相同的。你可以使用beeline脚本来测试Spark或者Hive-1.2.1的JDBC server。
在Spark目录下运行下面这个命令,启动一个JDBC/ODBC server
./sbin/start-thriftserver.sh
这个脚本能接受所有 bin/spark-submit 命令支持的选项参数,外加一个 –hiveconf 选项,来指定Hive属性。运行./sbin/start-thriftserver.sh –help可以查看完整的选项列表。默认情况下,启动的server将会在localhost:10000端口上监听。要改变监听主机名或端口,可以用以下环境变量:
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh /
--master <master-uri> /
...
或者Hive系统属性 来指定
./sbin/start-thriftserver.sh /
--hiveconf hive.server2.thrift.port=<listening-port> /
--hiveconf hive.server2.thrift.bind.host=<listening-host> /
--master <master-uri>
...
接下来,你就可以开始在beeline中测试这个Thrift JDBC/ODBC server:
./bin/beeline
下面的指令,可以连接到一个JDBC/ODBC server
beeline> !connect jdbc:hive2://localhost:10000
可能需要输入用户名和密码。在非安全模式下,只要输入你本机的用户名和一个空密码即可。对于安全模式,请参考beeline documentation.
Hive的配置是在conf/目录下的hive-site.xml,core-site.xml,hdfs-site.xml中指定的。
你也可以在beeline的脚本中指定。
Thrift JDBC server也支持通过HTTP传输Thrift RPC消息。以下配置(在conf/hive-site.xml中)将启用HTTP模式:
hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
同样,在beeline中也可以用HTTP模式连接JDBC/ODBC server:
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
使用Spark SQL命令行工具
Spark SQL CLI是一个很方便的工具,它可以用local mode运行hive metastore service,并且在命令行中执行输入的查询。注意Spark SQL CLI目前还不支持和Thrift JDBC server通信。
用如下命令,在spark目录下启动一个Spark SQL CLI
./bin/spark-sql
Hive配置在conf目录下hive-site.xml,core-site.xml,hdfs-site.xml中设置。你可以用这个命令查看完整的选项列表:./bin/spark-sql –help
升级指南
1.5升级到1.6
- 从Spark-1.6.0起,默认Thrift server 将运行于多会话并存模式下(multi-session)。这意味着,每个JDBC/ODBC连接有其独立的SQL配置和临时函数注册表。table的缓存仍然是公用的。如果你更喜欢老的单会话模式,只需设置spark.sql.hive.thriftServer.singleSession为true即可。当然,你也可在spark-defaults.conf中设置,或者将其值传给start-thriftserver.sh –conf(如下):
./sbin/start-thriftserver.sh /
--conf spark.sql.hive.thriftServer.singleSession=true /
...
1.4升级到1.5
- Tungsten引擎现在默认是启用的,Tungsten是通过手动管理内存优化执行计划,同时也优化了表达式求值的代码生成。这两个特性都可以通过把spark.sql.tungsten.enabled设为false来禁用。
- Parquet schema merging默认不启用。需要启用的话,设置spark.sql.parquet.mergeSchema为true即可
- Python接口支持用点(.)来访问字段内嵌值,例如df[‘table.column.nestedField’]。但这也意味着,如果你的字段名包含点号(.)的话,你就必须用重音符来转义,如:table.`column.with.dots`.nested。
- 列式存储内存分区剪枝默认是启用的。要禁用,设置spark.sql.inMemoryColumarStorage.partitionPruning为false即可
- 不再支持无精度限制的decimal。Spark SQL现在强制最大精度为38位。对于BigDecimal对象,类型推导将会使用(38,18)精度的decimal类型。如果DDL中没有指明精度,默认使用的精度是(10,0)
- 时间戳精确到1us(微秒),而不是1ns(纳秒)
- 在“sql”这个SQL变种设置中,浮点数将被解析为decimal。HiveQL解析保持不变。
- 标准SQL/DataFrame函数均为小写,例如:sum vs SUM。
- 当推测任务被启用是,使用DirectOutputCommitter是不安全的,因此,DirectOutputCommitter在推测任务启用时,将被自动禁用,且忽略相关配置。
- JSON数据源不再自动加载其他程序产生的新文件(例如,不是Spark SQL插入到dataset中的文件)。对于一个JSON的持久化表(如:Hive metastore中保存的表),用户可以使用REFRESH TABLE这个SQL命令或者HiveContext.refreshTable来把新文件包括进来。
1.3升级到1.4
DataFrame数据读写接口
根据用户的反馈,我们提供了一个新的,更加流畅的API,用于数据读(SQLContext.read)写(DataFrame.write),同时老的API(如:SQLCOntext.parquetFile, SQLContext.jsonFile)将被废弃。
有关SQLContext.read和DataFrame.write的更详细信息,请参考API文档。
DataFrame.groupBy保留分组字段
根据用户的反馈,我们改变了DataFrame.groupBy().agg()的默认行为,在返回的DataFrame结果中保留了分组字段。如果你想保持1.3中的行为,设置spark.sql.retainGroupColumns为false即可。
// 在1.3.x中,如果要保留分组字段"department", 你必须显式的在agg聚合时包含这个字段
df.groupBy("department").agg($"department", max("age"), sum("expense"))
// 而在1.4+,分组字段"department"默认就会包含在返回的DataFrame中
df.groupBy("department").agg(max("age"), sum("expense"))
// 要回滚到1.3的行为(不包含分组字段),按如下设置即可:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
1.2升级到1.3
在Spark 1.3中,我们去掉了Spark SQL的”Alpha“标签,并清理了可用的API。从Spark 1.3起,Spark SQL将对1.x系列二进制兼容。这个兼容性保证不包括显式的标注为”unstable(如:DeveloperAPI或Experimental)“的API。
SchemaRDD重命名为DataFrame
对于用户来说,Spark SQL 1.3最大的改动就是SchemaRDD改名为DataFrame。主要原因是,DataFrame不再直接由RDD派生,而是通过自己的实现提供RDD的功能。DataFrame只需要调用其rdd方法就能转成RDD。
在Scala中仍然有SchemaRDD,只不过这是DataFrame的一个别名,以便兼容一些现有代码。但仍然建议用户改用DataFrame。Java和Python用户就没这个福利了,他们必须改代码。
统一Java和Scala API
在Spark 1.3之前,有单独的java兼容类(JavaSQLContext和JavaSchemaRDD)及其在Scala API中的镜像。Spark 1.3中将Java API和Scala API统一。两种语言的用户都应该使用SQLContext和DataFrame。一般这些类中都会使用两种语言中都有的类型(如:Array取代各语言独有的集合)。有些情况下,没有通用的类型(例如:闭包或者maps),将会使用函数重载来解决这个问题。
另外,java特有的类型API被删除了。Scala和java用户都应该用org.apache.spark.sql.types来编程描述一个schema。
隐式转换隔离,DSL包移除 – 仅针对scala
Spark 1.3之前的很多示例代码,都在开头用 import sqlContext._,这行将会导致所有的sqlContext的函数都被引入进来。因此,在Spark 1.3我们把RDDs到DataFrames的隐式转换隔离出来,单独放到SQLContext.implicits对象中。用户现在应该这样写:import sqlContext.implicits._
另外,隐式转换也支持由Product(如:case classes或tuples)组成的RDD,但需要调用一个toDF方法,而不是自动转换。
如果需要使用DSL(被DataFrame取代的API)中的方法,用户之前需要导入DSL(import org.apache.spark.sql.catalyst.dsl), 而现在应该要导入 DataFrame API(import org.apache.spark.sql.functions._)
移除org.apache.spark.sql中DataType别名 – 仅针对scala
Spark 1.3删除了sql包中的DataType类型别名。现在,用户应该使用 org.apache.spark.sql.types中的类。
UDF注册挪到sqlContext.udf中 – 针对java和scala
Python DataTypes不再是单例
在python中使用DataTypes,你需要先构造一个对象(如:StringType()),而不是引用一个单例。
Shark用户迁移指南
调度
用户可以通过如下命令,为JDBC客户端session设定一个Fair Scheduler pool。
SET spark.sql.thriftserver.scheduler.pool=accounting;
Reducer个数
在Shark中,默认的reducer个数是1,并且由mapred.reduce.tasks设定。Spark SQL废弃了这个属性,改为 spark.sql.shuffle.partitions, 并且默认200,用户可通过如下SET命令来自定义:
SET spark.sql.shuffle.partitions=10;
SELECT page, count(*) c
FROM logs_last_month_cached
GROUP BY page ORDER BY c DESC LIMIT 10;
你也可以把这个属性放到hive-site.xml中来覆盖默认值。
目前,mapred.reduce.tasks属性仍然能被识别,并且自动转成spark.sql.shuffle.partitions
缓存
shark.cache表属性已经不存在了,并且以”_cached”结尾命名的表也不再会自动缓存。取而代之的是,CACHE TABLE和UNCACHE TABLE语句,用以显式的控制表的缓存:
CACHE TABLE logs_last_month;
UNCACHE TABLE logs_last_month;
注意:CACHE TABLE tbl 现在默认是饥饿模式,而非懒惰模式。再也不需要手动调用其他action来触发cache了!
从Spark-1.2.0开始,Spark SQL新提供了一个语句,让用户自己控制表缓存是否是懒惰模式
CACHE [LAZY] TABLE [AS SELECT] ...
以下几个缓存相关的特性不再支持:
- 用户定义分区级别的缓存逐出策略
- RDD 重加载
- 内存缓存直接写入策略
兼容Apache Hive
Spark SQL设计时考虑了和Hive metastore,SerDes以及UDF的兼容性。目前这些兼容性斗是基于Hive-1.2.1版本,并且Spark SQL可以连到不同版本的Hive metastore(从0.12.0到1.2.1,参考:http://spark.apache.org/docs/latest/sql-programming-guide.html#interacting-with-different-versions-of-hive-metastore)
部署在已有的Hive仓库之上
Spark SQL Thrift JDBC server采用了”out of the box”(开箱即用)的设计,使用很方便,并兼容已有的Hive安装版本。你不需要修改已有的Hive metastore或者改变数据的位置,或者表分区。
支持的Hive功能
Spark SQL 支持绝大部分Hive功能,如:
- Hive查询语句:
SELECT
GROUP BY
ORDER BY
CLUSTER BY
SORT BY
- 所有的Hive操作符:
- Relational operators (
=
,⇔
,==
,<>
,<
,>
,>=
,<=
, etc) - Arithmetic operators (
+
,-
,*
,/
,%
, etc) - Logical operators (
AND
,&&
,OR
,||
, etc) - Complex type constructors
- Mathematical functions (
sign
,ln
,cos
, etc) - String functions (
instr
,length
,printf
, etc)
- Relational operators (
- 用户定义函数(UDF)
- 用户定义聚合函数(UDAF)
- 用户定义序列化、反序列化(SerDes)
- 窗口函数(Window functions)
- Joins
JOIN
{LEFT|RIGHT|FULL} OUTER JOIN
LEFT SEMI JOIN
CROSS JOIN
- Unions
- 查询子句
SELECT col FROM ( SELECT a + b AS col from t1) t2
- 采样
- 执行计划详细(Explain)
- 分区表,包括动态分区插入
- 视图
- 所有Hive DDL(data definition language):
CREATE TABLE
CREATE TABLE AS SELECT
ALTER TABLE
- 绝大部分Hive数据类型:
TINYINT
SMALLINT
INT
BIGINT
BOOLEAN
FLOAT
DOUBLE
STRING
BINARY
TIMESTAMP
DATE
ARRAY<>
MAP<>
STRUCT<>
不支持的Hive功能
以下是目前不支持的Hive特性的列表。多数是不常用的。
不支持的Hive常见功能
- bucket表:butcket是Hive表的一个哈希分区
不支持的Hive高级功能
- UNION类操作
- 去重join
- 字段统计信息收集:Spark SQL不支持同步的字段统计收集
Hive输入、输出格式
- CLI文件格式:对于需要回显到CLI中的结果,Spark SQL仅支持TextOutputFormat。
- Hadoop archive — Hadoop归档
Hive优化
一些比较棘手的Hive优化目前还没有在Spark中提供。有一些(如索引)对应Spark SQL这种内存计算模型来说并不重要。另外一些,在Spark SQL未来的版本中会支持。
- 块级别位图索引和虚拟字段(用来建索引)
- 自动计算reducer个数(join和groupBy算子):目前在Spark SQL中你需要这样控制混洗后(post-shuffle)并发程度:”SET spark.sql.shuffle.partitions=[num_tasks];”
- 元数据查询:只查询元数据的请求,Spark SQL仍需要启动任务来计算结果
- 数据倾斜标志:Spark SQL不会理会Hive中的数据倾斜标志
STREAMTABLE
join提示:Spark SQL里没有这玩艺儿- 返回结果时合并小文件:如果返回的结果有很多小文件,Hive有个选项设置,来合并小文件,以避免超过HDFS的文件数额度限制。Spark SQL不支持这个。
参考
数据类型
Spark SQL和DataFrames支持如下数据类型:
- Numeric types(数值类型)
ByteType
: 1字节长的有符号整型,范围:-128
到127
.ShortType
: 2字节长有符号整型,范围:-32768
到32767
.IntegerType
: 4字节有符号整型,范围:-2147483648
到2147483647
.LongType
: 8字节有符号整型,范围:-9223372036854775808
to9223372036854775807
.FloatType
: 4字节单精度浮点数。DoubleType
: 8字节双精度浮点数DecimalType
: 任意精度有符号带小数的数值。内部使用java.math.BigDecimal, BigDecimal包含任意精度的不缩放整型,和一个32位的缩放整型
- String type(字符串类型)
StringType
: 字符串
- Binary type(二进制类型)
BinaryType
: 字节序列
- Boolean type(布尔类型)
BooleanType
: 布尔类型
- Datetime type(日期类型)
TimestampType
: 表示包含年月日、时分秒等字段的日期DateType
: 表示包含年月日字段的日期
- Complex types(复杂类型)
ArrayType(elementType, containsNull)
:数组类型,表达一系列的elementType类型的元素组成的序列,containsNull表示数组能否包含null值MapType(keyType, valueType, valueContainsNull)
:映射集合类型,表示一个键值对的集合。键的类型是keyType,值的类型则由valueType指定。对应MapType来说,键是不能为null的,而值能否为null则取决于valueContainsNull。StructType(fields):
表示包含StructField序列的结构体。- StructField(name, datatype, nullable): 表示StructType中的一个字段,name是字段名,datatype是数据类型,nullable表示该字段是否可以为空
所有Spark SQL支持的数据类型都在这个包里:org.apache.spark.sql.types,你可以这样导入之:
import org.apache.spark.sql.types._
Data type | Value type in Scala | API to access or create a data type |
---|---|---|
ByteType | Byte | ByteType |
ShortType | Short | ShortType |
IntegerType | Int | IntegerType |
LongType | Long | LongType |
FloatType | Float | FloatType |
DoubleType | Double | DoubleType |
DecimalType | java.math.BigDecimal | DecimalType |
StringType | String | StringType |
BinaryType | Array[Byte] | BinaryType |
BooleanType | Boolean | BooleanType |
TimestampType | java.sql.Timestamp | TimestampType |
DateType | java.sql.Date | DateType |
ArrayType | scala.collection.Seq | ArrayType(elementType, [containsNull])注意:默认containsNull为true |
MapType | scala.collection.Map | MapType(keyType, valueType, [valueContainsNull])注意:默认valueContainsNull为true |
StructType | org.apache.spark.sql.Row | StructType(fields)注意:fields是一个StructFields的序列,并且同名的字段是不允许的。 |
StructField | 定义字段的数据对应的Scala类型(例如,如果StructField的dataType为IntegerType,则其数据对应的scala类型为Int) | StructField(name, dataType, nullable) |
NaN语义
这是Not-a-Number的缩写,某些float或double类型不符合标准浮点数语义,需要对其特殊处理:
- NaN == NaN,即:NaN和NaN总是相等
- 在聚合函数中,所有NaN分到同一组
- NaN在join操作中可以当做一个普通的join key
- NaN在升序排序中排到最后,比任何其他数值都大
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/116945.html