这期内容当中小编将会给大家带来有关如何解析SparkSQL外部数据源,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
场景介绍:
大数据MapReduce,Hive,Spark作业,首先需要加载数据,数据的存放源可能是HDFS、HBase、S3、OSS mongoDB;数据格式也可能为json、text、csv、parquet、jdbc..或者数据格式经过压缩,不同格式文件需要不同的解析方式,
如果需要HDFS关联MySQL数据,可以通过sqoop进行一些列转换到,如果使用External Data Source API直接加载为DF拿到数据,简单的说可以通过SparkSQL拿到外部数据源数据加载成DF。
加载方式:
build-in :内置加载外部数据如 json、text、parquet、jdbc、HDFS;
third-party:第三方加载外部数据如HBase、S3、OSS mongoDB
第三方JAR地址:https://spark-packages.org/
Maven工程需要导入gav
spark-shell:需要外部导入–package g:a:v
SPARK_HOME/bin/spark-shell –packages com.databricks:spark-csv_2.11:1.5.0
优势:下载依赖包到本地
缺点:内网环境没有网络无法下载
一、外部数据源读取parquet文件:
Spark context Web UI available at http://hadoop001:4040
Spark context available as 'sc' (master = local[2], app id = local-1536244013147).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_/ // _ // _ `/ __/ '_/
/___/ .__//_,_/_/ /_//_/ version 2.3.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.read.load("file:///home/hadoop/app/spark–bin-2.6.0-cdh6.7.0/examples/src/main/resources/people.txt").show
提示错误:/people.txt is not a Parquet file
注意:spark.read.load()底层默认读取Parquet file
scala> spark.read.load("file:///home/hadoop/app/spark–bin-2.6.0-cdh6.7.0/examples/src/main/resources/users.parquet").show
18/09/06 10:32:29 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
+------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+
scala> val users = spark.read.load("file:///home/hadoop/app/spark–bin-2.6.0-cdh6.7.0/examples/src/main/resources/users.parquet")
users: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string … 1 more field]
scala> users.printSchema
root
|– name: string (nullable = true)
|– favorite_color: string (nullable = true)
|– favorite_numbers: array (nullable = true)
| |– element: integer (containsNull = true)
scala> users.show
+------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+
— 查看列,常规操作
scala> users.select("name").show
+------+ | name| +------+ |Alyssa| | Ben| +------+
二、转换操作
— 转成json格式输出
scala> users.select("name","favorite_color").write.format("json").save("file:////home/hadoop/data/parquet/")
[hadoop@hadoop001 parquet]$ cat * {"name":"Alyssa"} {"name":"Ben","favorite_color":"red"}
— 不采取压缩
.option("compression","none")
— 转成text格式输出
scala> users.select("name").write.format("text").save("file:////home/hadoop/data/parquet2/")
[hadoop@hadoop001 parquet2]$ cat *
Alyssa
— Save Modes
用法:.mode("")
1、default — 目标目录存在,抛出异常
2、append
— 目标目录存在,重跑数据+1,无法保证数据幂等
3、overwrite– 目标目录存在,覆盖原文件
4、ignore– 忽略你的模式,目标纯在将不保存
三、spark-shell操作JDBC数据
— 读取外部MySQL数据为DF
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/ruozedata").option("driver","com.mysql.jdbc.Driver").option("dbtable", "city_info").option("user","root").option("password", "root").load()
— 查看表信息
jdbcDF.show()
— 获取本地数据
val deptDF = spark.table("dept")
— join关联使用
deptDF.join(jdbcDF,deptDF.col("deptid") === jdbcDF.col("deptid"))
— DF写入MySQL本地,数据类型有变化,重复写入需要加上.mode("overwrite")
jdbcDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/hive_data").option("driver","com.mysql.jdbc.Driver").option("dbtable", "city_info_bak").option("user","root").option("password", "root").save()
mysql> show tables
+---------------------------+ | Tables_in_hive_data | +---------------------------+ | bucketing_cols | | cds | | city_info_bak | +---------------------------+
— 如果想类型不发生变化指定option指定字段类型
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
四、spark-sql操作JDBC数据
— SQL创建临时表视图,单session
CREATE TEMPORARY VIEW emp_sql USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:mysql://hadoop001:3306/ruozedata", dbtable "city_info", user 'root', password 'root' )
show tbales;
INSERT INTO TABLE emp_sql
SELECT * FROM emp_sql
五、Perdicate Push Down(PPD)
disk
network CPU
外部数据外(1T)——->获取本地磁盘(1T)———->提交到集群(1T)———>结果(1G)
disk
network CPU
外部数据外(1T)——->经过列裁剪(10G)———–>提交到集群(10G)———–>传结果(1g)
disk
CPU network
外部数据外(1T)——->经过列裁剪(10G)———->进过计算(1G)———–>传输结果
六、SparkSQL外部数据源实现机制
— 0.有效的读取外部数据源的数据的
— 1.buildScan扫描整张表,变成一个RDD[ROW]
trait
TableScan
{
def
buildScan(): RDD[Row]
}
— 2.PrunedScan获取表的裁剪列
trait
PrunedScan
{
def
buildScan(requiredColumns: Array[String]): RDD[Row]
}
— 3.PrunedFilteredScan列裁剪,行过滤
trait
PrunedFilteredScan
{
def
buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
— 4.加载外部数据源的数据,定义数据的schema信息
abstract class
BaseRelation{
}
— 5.Relation产生BaseRelation使用
trait
RelationProvider
{
}
总归:
— 查询类操作
trait
PrunedScan
{
def
buildScan(requiredColumns: Array[String]): RDD[Row]
}
— 列裁剪,行过滤
trait
PrunedFilteredScan
{
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
— 写入类操作
trait
InsertableRelation
{
def
insert(data: DataFrame, overwrite: Boolean): Unit
}
上述就是小编为大家分享的如何解析SparkSQL外部数据源了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。
原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/tech/opensource/222760.html