四、spark–sparkSQL原理和使用

[TOC]

一、spark SQL概述

1.1 什么是spark SQL

​ Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。类似于hive的作用。

1.2 spark SQL的特点

1、容易集成:安装Spark的时候,已经集成好了。不需要单独安装。
2、统一的数据访问方式:JDBC、JSON、Hive、parquet文件(一种列式存储文件,是SparkSQL默认的数据源,hive中也支持)
3、完全兼容Hive。可以将Hive中的数据,直接读取到Spark SQL中处理。
一般在生产中,基本都是使用hive做数据仓库存储数据,然后用spark从hive读取数据进行处理。
4、支持标准的数据连接:JDBC、ODBC
5、计算效率比基于mr的hive高,而且hive2.x版本中,hive建议使用spark作为执行引擎

二、spark SQL基本原理

2.1 DataFrame和DataSet基本概念

2.1.1 DataFrame

DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表,里面有表的结构以及数据,但在底层具有更丰富的优化。DataFrames可以从各种来源构建,
例如:
结构化数据文件
hive中的表
外部数据库或现有RDDs
DataFrame API支持的语言有Scala,Java,Python和R。

​ 比起RDD,DataFrame多了数据的结构信息,即schema。RDD是分布式的 Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化。

2.1.2 DataSet

Dataset是一个分布式的数据收集器。这是在Spark1.6之后新加的一个接口,兼顾了RDD的优点(强类型,可以使用功能强大的lambda)以及Spark SQL的执行器高效性的优点。所以可以把DataFrames看成是一种特殊的Datasets,即:Dataset(Row)

2.2 创建DataFrame的方式

2.2.1 SparkSession对象

​ Apache Spark 2.0引入了SparkSession,其为用户提供了一个统一的切入点来使用Spark的各项功能,并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。最重要的是,它减少了用户需要了解的一些概念,使得我们可以很容易地与Spark交互。
​ 在2.0版本之前,与Spark交互之前必须先创建SparkConf和SparkContext。然而在Spark 2.0中,我们可以通过SparkSession来实现同样的功能,而不需要显式地创建SparkConf, SparkContext 以及 SQLContext,因为这些对象已经封装在SparkSession中。
​ 要注意一点,在我用的这个spark版本中,直接使用new SQLContext() 来创建SQLContext对象,会显示该方式已经被弃用了(IDEA中会显示已弃用),建议使用SparkSession来获取SQLContext对象。

2.2.2 通过case class样本类

这种方式在scala中比较常用,因为case class是scala的特色

/**
表 t_stu 的结构为:
id name age
*/

object CreateDF {
  def main(args: Array[String]): Unit = {
    //这是最新的获取SQLContext对象的方式
    //2、创建SparkSession对象,设置master,appname
    val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
    //3、通过spark获取sparkContext对象,读取数据
    val lines = spark.sparkContext.textFile("G://test//t_stu.txt").map(_.split(","))

    //4、将数据映射到case class中,也就是数据映射到表的对应字段中
    val tb = lines.map(t=>emp(t(0).toInt,t(1),t(2).toInt))
    //这里必须要加上隐式转换,否则无法调用 toDF 函数
    import spark.sqlContext.implicits._

    //5、生成df
    val df2 = tb.toDF()

    //相当于select name from t_stu
    df1.select($"name").show()

    //关闭spark对象
    spark.stop()
  }
}

/*1、定义case class,每个属性对应表中的字段名以及类型
     一般生产中为了方便,会全部定义为string类型,然后有需要的时候
     才根据实际情况将string转为需要的类型
   这一步相当于定义表的结构
*/
case class emp(id:Int,name:String,age:Int)

总结步骤为:

1、定义case class,用来表结构
2、创建sparkSession对象,用来读取数据
3、将rdd中的数据和case class映射
4、调用 toDF 函数将rdd转为 DataFrame

2.2.3 通过StructType类

这种方式java比较常用

package SparkSQLExer

import org.apache
import org.apache.spark
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}

/**
  * 创建dataschema方式2:
  * 通过spark session对象创建,表结构通过StructType创建
  */
object CreateDF02 {
  def main(args: Array[String]): Unit = {
    val sparkS = SparkSession.builder().master("local").appName("create schema").getOrCreate()

    //1、通过StructType创建表结构schema,里面表的每个字段使用 StructField定义
    val tbSchema = StructType(List(
        StructField("id",DataTypes.IntegerType),
        StructField("name",DataTypes.StringType),
        StructField("age",DataTypes.IntegerType)
      ))

    //2、读取数据
    var lines = sparkS.sparkContext.textFile("G://test//t_stu.txt").map(_.split(","))

    //3、将数据映射为ROW对象
    val rdd1 = lines.map(t=>Row(t(0).toInt,t(1),t(2).toInt))

    //4、创建表结构和表数据映射,返回的就是df
    val df2 = sparkS.createDataFrame(rdd1, tbSchema)

    //打印表结构
    df2.printSchema()

    sparkS.stop()

  }

}

总结步骤为:

1、通过StructType创建表结构schema,里面表的每个字段使用 StructField定义
2、通过sparkSession.sparkContext读取数据
3、将数据映射格式为Row对象
4、将StructType和数据Row对象映射,返回df

2.2.4 使用json等有表格式的文件类型

package SparkSQLExer

import org.apache.spark.sql.SparkSession

/**
  * 创建df方式3:通过有格式的文件直接导入数据以及表结构,比如json格式的文件
  * 返回的直接就是一个DF
  */
object CreateDF03 {
  def main(args: Array[String]): Unit = {
    val sparkS = SparkSession.builder().master("local").appName("create df through json").getOrCreate()

    //读取json方式1:
    val jsonrdd1= sparkS.read.json("path")

    //读取json方式2:
    val jsonrdd1= sparkS.read.format("json").load("path")

    sparkS.stop()
  }
}

这种方式比较简单,就是直接读取json文件而已
sparkS.read.xxxx读取任意文件时,返回的都是DF对象

2.3 操作DataFrame

2.3.1 DSL语句

DSL语句其实就是将sql语句的一些操作转为类似函数的方式去调用,比如:

df1.select("name").show

例子:

为了方便,直接在spark-shell里操作了,
spark-shell --master spark://bigdata121:7077

1、打印表结构
scala> df1.printSchema
root
|-- empno: integer (nullable = true)
|-- ename: string (nullable = true)
|-- job: string (nullable = true)
|-- mgr: integer (nullable = true)
|-- hiredate: string (nullable = true)
|-- sal: integer (nullable = true)
|-- comm: integer (nullable = true)
|-- deptno: integer (nullable = true)

2、显示当前df的表数据或者查询结果的数据
scala> df1.show
+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH|    CLERK|7902|1980/12/17| 800|   0|    20|
| 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698| 1981/2/22|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|  1981/4/2|2975|   0|    20|
| 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|  1981/5/1|2850|   0|    30|
| 7782| CLARK|  MANAGER|7839|  1981/6/9|2450|   0|    10|
| 7788| SCOTT|  ANALYST|7566| 1987/4/19|3000|   0|    20|
| 7839|  KING|PRESIDENT|7839|1981/11/17|5000|   0|    10|
| 7844|TURNER| SALESMAN|7698|  1981/9/8|1500|   0|    30|
| 7876| ADAMS|    CLERK|7788| 1987/5/23|1100|   0|    20|
| 7900| JAMES|    CLERK|7698| 1981/12/3| 950|   0|    30|
| 7902|  FORD|  ANALYST|7566| 1981/12/3|3000|   0|    20|
| 7934|MILLER|    CLERK|7782| 1982/1/23|1300|   0|    10|
+-----+------+---------+----+----------+----+----+------+

3、执行select, 相当于select xxx form  xxx where xxx
scala> df1.select("ename","sal").where("sal>2000").show
+------+----+
| ename| sal|
+------+----+
| SMITH| 800|
| ALLEN|1600|
|  WARD|1250|
| JONES|2975|
|MARTIN|1250|
| BLAKE|2850|
| CLARK|2450|
| SCOTT|3000|
|  KING|5000|
|TURNER|1500|
| ADAMS|1100|
| JAMES| 950|
|  FORD|3000|
|MILLER|1300|
+------+----+

4、对某些列进行操作
对某个指定进行操作时,需要加上$符号,然后后面才能操作
$代表 取出来以后,再做一些操作。
注意:这个 $ 的用法在ideal中无法正常使用,解决方法下面说
scala> df1.select($"ename",$"sal",$"sal"+100).show
+------+----+-----------+
| ename| sal|(sal + 100)|
+------+----+-----------+
| SMITH| 800|        900|
| ALLEN|1600|       1700|
|  WARD|1250|       1350|
| JONES|2975|       3075|
|MARTIN|1250|       1350|
| BLAKE|2850|       2950|
| CLARK|2450|       2550|
| SCOTT|3000|       3100|
|  KING|5000|       5100|
|TURNER|1500|       1600|
| ADAMS|1100|       1200|
| JAMES| 950|       1050|
|  FORD|3000|       3100|
|MILLER|1300|       1400|
+------+----+-----------+

5、过滤行
scala> df1.filter($"sal">2000).show
+-----+-----+---------+----+----------+----+----+------+
|empno|ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+-----+---------+----+----------+----+----+------+
| 7566|JONES|  MANAGER|7839|  1981/4/2|2975|   0|    20|
| 7698|BLAKE|  MANAGER|7839|  1981/5/1|2850|   0|    30|
| 7782|CLARK|  MANAGER|7839|  1981/6/9|2450|   0|    10|
| 7788|SCOTT|  ANALYST|7566| 1987/4/19|3000|   0|    20|
| 7839| KING|PRESIDENT|7839|1981/11/17|5000|   0|    10|
| 7902| FORD|  ANALYST|7566| 1981/12/3|3000|   0|    20|
+-----+-----+---------+----+----------+----+----+------+

6、分组以及计数
scala> df1.groupBy($"deptno").count.show
+------+-----+                                                                  
|deptno|count|
+------+-----+
|    20|    5|
|    10|    3|
|    30|    6|
+------+-----+

上面说到在ide中 select($"name")中无法正常使用,解决方法为:

在该语句之前加上这么一句:
import spark.sqlContext.implicits._

主要还是因为类型的问题,加上隐式转换就好了

2.3.2 sql语句

df对象不能直接执行sql。需要生成一个视图,再执行SQL。
需要指定创建的视图名称,后面视图名称就相当于表名。
视图后面还会细说,这里先有个概念
例子:

val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
。。。。。。。。。。。。。。
//通过df对象创建临时视图。视图名就相当于表名
df1.createOrReplaceTempView("emp")

//通过sparksession对象执行执行
spark.sql("select * from emp").show
spark.sql("select * from emp where sal > 2000").show
spark.sql("select deptno,count(1) from emp group by deptno").show

//可以创建多个视图,不冲突
df1.createOrReplaceTempView("emp12345")
spark.sql("select e.deptno from emp12345 e").show

2.3.3 多表查询

scala> case class Dept(deptno:Int,dname:String,loc:String)
defined class Dept

scala> val lines = sc.textFile("/usr/local/tmp_files/dept.csv").map(_.split(","))
lines: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[68] at map at <console>:24

scala> val allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2)))
allDept: org.apache.spark.rdd.RDD[Dept] = MapPartitionsRDD[69] at map at <console>:28

scala> val df2 = allDept.toDF
df2: org.apache.spark.sql.DataFrame = [deptno: int, dname: string ... 1 more field]

scala> df2.create
createGlobalTempView   createOrReplaceTempView   createTempView

scala> df2.createOrReplaceTempView("dept")

scala> spark.sql("select dname,ename from emp12345,dept where emp12345.deptno=dept.deptno").show
+----------+------+                                                             
|     dname| ename|
+----------+------+
|  RESEARCH| SMITH|
|  RESEARCH| JONES|
|  RESEARCH| SCOTT|
|  RESEARCH| ADAMS|
|  RESEARCH|  FORD|
|ACCOUNTING| CLARK|
|ACCOUNTING|  KING|
|ACCOUNTING|MILLER|
|     SALES| ALLEN|
|     SALES|  WARD|
|     SALES|MARTIN|
|     SALES| BLAKE|
|     SALES|TURNER|
|     SALES| JAMES|
+----------+------+

2.4 创建DataSet

2.4.1 通过case class

和DataFrame类似,只是把 toDF改为调用 toDS 方法

package SparkSQLExer

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object CreateDS {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
    val lines = spark.sparkContext.textFile("G://test//t_stu.txt").map(_.split(","))
    val tb = lines.map(t=>emp1(t(0).toInt,t(1),t(2).toInt))
    import spark.sqlContext.implicits._
    val df1 = tb.toDS()
    df1.select($"name")

  }
}

case class emp1(id:Int,name:String,age:Int)

2.4.2 通过序列Seq类对象

package SparkSQLExer

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object CreateDS {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()

    //创建一个序列对象,里面都是emp1对象,映射的数据,然后直接toDS转为DataSet
    val ds1 = Seq(emp1(1,"king",20)).toDS()
    ds1.printSchema()

  }
}

case class emp1(id:Int,name:String,age:Int)

2.4.3 使用json格式文件

定义case class
case class Person(name:String,age:BigInt)

使用JSON数据生成DataFrame
val df = spark.read.format("json").load("/usr/local/tmp_files/people.json")

将DataFrame转换成DataSet
df.as[Person].show

df.as[Person] 是一个 DataSet
as[T]中的泛型需要是一个case class类,用于映射表头

2.5 操作DataSet

DataSet支持的算子其实就是rdd和DataFrame算子的结合。

使用emp.json 生成DataFrame
val empDF = spark.read.json("/usr/local/tmp_files/emp.json")

scala> empDF.show
+----+------+-----+------+----------+---------+----+----+
|comm|deptno|empno| ename|  hiredate|      job| mgr| sal|
+----+------+-----+------+----------+---------+----+----+
|    |    20| 7369| SMITH|1980/12/17|    CLERK|7902| 800|
| 300|    30| 7499| ALLEN| 1981/2/20| SALESMAN|7698|1600|
| 500|    30| 7521|  WARD| 1981/2/22| SALESMAN|7698|1250|
|    |    20| 7566| JONES|  1981/4/2|  MANAGER|7839|2975|
|1400|    30| 7654|MARTIN| 1981/9/28| SALESMAN|7698|1250|
|    |    30| 7698| BLAKE|  1981/5/1|  MANAGER|7839|2850|
|    |    10| 7782| CLARK|  1981/6/9|  MANAGER|7839|2450|
|    |    20| 7788| SCOTT| 1987/4/19|  ANALYST|7566|3000|
|    |    10| 7839|  KING|1981/11/17|PRESIDENT|    |5000|
|   0|    30| 7844|TURNER|  1981/9/8| SALESMAN|7698|1500|
|    |    20| 7876| ADAMS| 1987/5/23|    CLERK|7788|1100|
|    |    30| 7900| JAMES| 1981/12/3|    CLERK|7698| 950|
|    |    20| 7902|  FORD| 1981/12/3|  ANALYST|7566|3000|
|    |    10| 7934|MILLER| 1982/1/23|    CLERK|7782|1300|
+----+------+-----+------+----------+---------+----+----+

scala> empDF.where($"sal" >= 3000).show
+----+------+-----+-----+----------+---------+----+----+
|comm|deptno|empno|ename|  hiredate|      job| mgr| sal|
+----+------+-----+-----+----------+---------+----+----+
|    |    20| 7788|SCOTT| 1987/4/19|  ANALYST|7566|3000|
|    |    10| 7839| KING|1981/11/17|PRESIDENT|    |5000|
|    |    20| 7902| FORD| 1981/12/3|  ANALYST|7566|3000|
+----+------+-----+-----+----------+---------+----+----+

#### empDF 转换成 DataSet 需要 case class

scala> case class Emp(empno:BigInt,ename:String,job:String,mgr:String,hiredate:String,sal:BigInt,comm:String,deptno:BigInt)
defined class Emp

scala> val empDS = empDF.as[Emp]
empDS: org.apache.spark.sql.Dataset[Emp] = [comm: string, deptno: bigint ... 6 more fields]

scala> empDS.filter(_.sal > 3000).show
+----+------+-----+-----+----------+---------+---+----+
|comm|deptno|empno|ename|  hiredate|      job|mgr| sal|
+----+------+-----+-----+----------+---------+---+----+
|    |    10| 7839| KING|1981/11/17|PRESIDENT|   |5000|
+----+------+-----+-----+----------+---------+---+----+

scala> empDS.filter(_.deptno == 10).show
+----+------+-----+------+----------+---------+----+----+
|comm|deptno|empno| ename|  hiredate|      job| mgr| sal|
+----+------+-----+------+----------+---------+----+----+
|    |    10| 7782| CLARK|  1981/6/9|  MANAGER|7839|2450|
|    |    10| 7839|  KING|1981/11/17|PRESIDENT|    |5000|
|    |    10| 7934|MILLER| 1982/1/23|    CLERK|7782|1300|
+----+------+-----+------+----------+---------+----+----+

多表查询:

1、创建部门表
scala> val deptRDD = sc.textFile("/usr/local/tmp_files/dept.csv").map(_.split(","))
deptRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[154] at map at <console>:24

scala> case class Dept(deptno:Int,dname:String,loc:String)
defined class Dept

scala> val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS
deptDS: org.apache.spark.sql.Dataset[Dept] = [deptno: int, dname: string ... 1 more field]

scala> deptDS.show
+------+----------+--------+
|deptno|     dname|     loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+

2、员工表
同上 empDS

empDS.join(deptDS,"deptno").where(xxxx) 连接两个表,通过deptno字段
empDS.joinWith(deptDS,deptDS("deptno")===empDS("deptno")) 这个用于连接的字段名称不一样的情况

2.6 视图view

​ 如果想使用标准的sql语句来操作df或者ds对象时,必须先给df或者ds对象创建视图,然后通过SparkSession对象的sql函数来对相应的视图进行操作才可以。那么视图是什么?
​ 视图是一个虚表,不存储数据,可以当做是表的一个访问链接。视图有两种类型:
普通视图:也叫本地视图,只在当前session会话中有效
全局视图:在全部session中都有效,全局视图创建在指定命名空间中:global_temp 类似于一个库
操作说明:

val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
val empDF = spark.read.json("/usr/local/tmp_files/emp.json")

创建本地视图:
empDF.createOrReplaceTempView(视图名),视图存在就会重新创建
empDF.createTempView(视图名),如果视图存在就不会创建

创建全局视图:
empDF.createGlobalTempView(视图名)

对视图执行sql操作,这里视图名就类似于表名
spark.sql("xxxxx")

例子:
empDF.createOrReplaceTempView("emp")
spark.sql("select * from emp").show

注意,只要创建了视图,那么就可以通过sparksession对象在任意一个类中操作视图,也就是表。这个特性很好用,当我们要操作一些表时,可以一开始就读取成df,然后创建成视图,那么就可以在任意一个地方查询表了。

2.7 数据源

通过SparkSession对象可以读取不同格式的数据源:

val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()

下面都用上面的spark代称SparkSession。

2.7.1 SparkSession读取数据的方式

1、load
spark.read.load(path):读取指定路径的文件,要求文件存储格式为Parquet文件

2、format
spark.read.format("格式").load(path) :指定读取其他格式的文件,如json
例子:
spark.read.format("json").load(path)

3、直接读取其他格式文件
spark.read.格式名(路径),这是上面2中的一个简写方式,例子:
spark.read.json(路径)  json格式文件
spark.read.text(路径)  读取文本文件

注意:这些方式返回的都是 DataFrame 对象

2.7.2 SparkSession保存数据的方式

可以将DataFrame 对象写入到指定格式的文件中,假设有个DataFrame 对象为df1.

1、save
df1.write.save(路径) 
他会将文件保存到这个目录下,文件名spark随机生成的,所以使用上面的读取方式的时候,直接指定读取目录即可,不用指定文件名。输出的文件格式为 Parquet。可以直接指定hdfs的路径,否则就存储到本地
如:
df1.write.save("/test")
spark.read.load("/test")

2、直接指定格式存储
df1.write.json(路径)  这样就会以json格式保存文件,生成的文件名的情况和上面类似

3、指定保存模式
如果没有指定保存模式,输出路径存在的情况下,就会报错
df1.write.mode("append").json(路径) 
mode("append") 就表示文件存在时就追加
mode("overwrite") 表示覆盖旧数据

4、保存为表
df1.write.saveAsTable(表名) 会保存在当前目录的spark-warehouse 目录下

5、format
df1.write.format(格式).save()
使用指定特定格式的方式来输出保存数据,比如保存到MongoDB数据库中

2.7.3 Parquet格式

​ 这种一种列式存储格式,具体原理可以看看之前hive的文章。这种格式是默认的存储格式,使用load和save时默认的格式,操作方式很像前面说的,这里不重复。这里要讲的是Parquet的一个特殊的功能,支持schema(表结构)的合并。例子:

scala> val df1 = sc.makeRDD(1 to 5).map(i=>(i,i*2)).toDF("single","double")
df1: org.apache.spark.sql.DataFrame = [single: int, double: int]

scala> df1.show
+------+------+
|single|double|
+------+------+
|     1|     2|
|     2|     4|
|     3|     6|
|     4|     8|
|     5|    10|
+------+------+

scala> sc.makeRDD(1 to 5)
res8: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at makeRDD at <console>:25

scala> sc.makeRDD(1 to 5).collect
res9: Array[Int] = Array(1, 2, 3, 4, 5)

//导出表1
scala> df1.write.parquet("/usr/local/tmp_files/test_table/key=1")

scala> val df2 = sc.makeRDD(6 to 10).map(i=>(i,i*3)).toDF("single","triple")
df2: org.apache.spark.sql.DataFrame = [single: int, triple: int]

scala> df2.show
+------+------+
|single|triple|
+------+------+
|     6|    18|
|     7|    21|
|     8|    24|
|     9|    27|
|    10|    30|
+------+------+

//导出表2
scala> df2.write.parquet("/usr/local/tmp_files/test_table/key=2")

scala> val df3 = spark.read.parquet("/usr/local/tmp_files/test_table")
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 1 more field]

//直接读取会丢失字段
scala> df3.show
+------+------+---+
|single|double|key|
+------+------+---+
|     8|  null|  2|
|     9|  null|  2|
|    10|  null|  2|
|     3|     6|  1|
|     4|     8|  1|
|     5|    10|  1|
|     6|  null|  2|
|     7|  null|  2|
|     1|     2|  1|
|     2|     4|  1|
+------+------+---+

//加上option,指定"mergeSchema"为true,就可以合并
scala> val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table")
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 2 more fields]

scala> df3.show
+------+------+------+---+
|single|double|triple|key|
+------+------+------+---+
|     8|  null|    24|  2|
|     9|  null|    27|  2|
|    10|  null|    30|  2|
|     3|     6|  null|  1|
|     4|     8|  null|  1|
|     5|    10|  null|  1|
|     6|  null|    18|  2|
|     7|  null|    21|  2|
|     1|     2|  null|  1|
|     2|     4|  null|  1|
+------+------+------+---+

补充问题:key 是什么?必须用key嘛?
key是不同表的一个区分字段,在合并的时候,会作为合并之后的表的一个字段,并且值等于key=xx 中设置的值
如果目录下,两个表的目录名不一样,是无法合并的,合并字段名可以任意,
如:一个是key ,一个是 test 这两个无法合并,必须统一key或者test

2.7.4 json文件

这种一种带表格式字段的文件,例子:

scala> val peopleDF = spark.read.json("/usr/local/tmp_files/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> peopleDF.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

scala> peopleDF.createOrReplaceTempView("people")

scala> spark.sql("select * from people where age=19")
res25: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> spark.sql("select * from people where age=19").show
+---+------+
|age|  name|
+---+------+
| 19|Justin|
+---+------+

scala> spark.sql("select age,count(1) from people group by  age").show
+----+--------+                                                                 
| age|count(1)|
+----+--------+
|  19|       1|
|null|       1|
|  30|       1|
+----+--------+

2.7.5 JDBC 连接

df对象支持通过jdbc连接数据库,写入数据到数据库,或者从数据库读取数据。
例子:
1、通过jdbc 从mysql读取数据:

使用 format(xx).option()的方式指定连接数据库的一些参数,比如用户名密码,使用的连接驱动等

import java.util.Properties

import org.apache.spark.sql.SparkSession

object ConnMysql {
  def main(args: Array[String]): Unit = {
    val sparkS = SparkSession.builder().appName("spark sql conn mysql").master("local").getOrCreate()
    //连接mysql方式1:
    //创建properties配置对象,用于存放连接mysql的参数
    val mysqlConn = new Properties()
    mysqlConn.setProperty("user","root")
    mysqlConn.setProperty("password","wjt86912572")
    //使用jdbc连接,指定连接字符串,表名,以及其他连接参数,并返回对应的dataframe
    val mysqlDF1 = sparkS.read.jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8","customer",mysqlConn)
    mysqlDF1.printSchema()
    mysqlDF1.show()
    mysqlDF1.createTempView("customer")
    sparkS.sql("select * from customer limit 2").show()

    //连接mysql方式2,这种方式比较常用:
    val mysqlConn2 = sparkS.read.format("jdbc")
      .option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8")              
      .option("user","root")
      .option("password","wjt86912572")
      .option("driver","com.mysql.jdbc.Driver")
      .option("dbtable","customer").load()

    mysqlConn2.printSchema()
  }
}

这是两种连接读取数据的方式。

2、jdbc写入数据到mysql

和读取类似,只不过换成了write操作

import java.util.Properties

import org.apache.spark.sql.SparkSession

object WriteToMysql {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("write to mysql").master("local").getOrCreate()

    val df1 = spark.read.text("G://test//t_stu.json")

    //方式1:
    df1.write.format("jdbc")
      .option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8")
      .option("user","root")
      .option("password","wjt86912572")
      .option("driver","com.mysql.jdbc.Driver")
      .option("dbtable","customer").save()

    //方式2:
    val mysqlConn = new Properties()
    mysqlConn.setProperty("user","root")
    mysqlConn.setProperty("password","wjt86912572")
    df1.write.jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8","customer",mysqlConn)

  }

}

必须要保证df的表格式和写入的mysql的表格式一样,字段名也要一样

2.7.6 hive

1、通过jdbc连接hive
方式和普通jdbc类似,例如:

import java.util.Properties

import org.apache.spark.sql.SparkSession

/**
  * 连接hive的情况有两种:
  * 1、如果是直接在ideal中运行spark程序的话,则必须在程序中指定jdbc连接的hiveserver的地址
  * 且hiveserver必须以后台服务的形式暴露10000端口出来.这种方式是直接通过jdbc连接hive
  *
  * 2、如果程序是打包到spark集群中运行的话,一般spark集群的conf目录下,已经有hive client
  * 的配置文件,就会直接启动hive client来连接hive。这时不需要启动hiveserver服务。
  * 这种方式是通过hive client连接hive
  */
object ConnHive {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("spark sql conn mysql").master("local").getOrCreate()
    val properties = new Properties()
    properties.setProperty("user","")
    properties.setProperty("password","")

    val hiveDF = spark.read.jdbc("jdbc:hive2://bigdata121:10000/default","customer",properties)
    hiveDF.printSchema()
    spark.stop()
  }
}

这种方式要注意一点:
hiveserver必须以后台服务的形式暴露10000端口出来.这种方式是直接通过jdbc连接hive。

2、通过hive client连接hive
​ 这种方式一般用在生产中,因为任务一般都是通过spark-submit提交到集群中运行,这时候就会直接通过hive client来连接hive,不会通过jdbc来连接了。
​ 要注意:需要在spark的节点上都配置上hive client,然后将hive-site.xml配置文件拷贝到 spark的conf目录下。同时需要将hadoop的core-site.xml hdfs-site.xml也拷贝过去。另外一方面,因为要使用hive client,所以hive server那边,一般都要配置metastore server,具体配置看hive的文章。
​ 这样在spark集群中的程序就可以直接使用

spark.sql("xxxx").show

这样的操作,默认就会从hive中读取对应的表进行操作。不用另外做任何连接hive 的操作

或者直接到 spark-shell中,也是可以直接使用 上面的方式操作hive的表
例如:

import org.apache.spark.sql.SparkSession

object ConnHive02 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("spark sql conn hive").getOrCreate()
    spark.sql("select * from customer").show()
  }

}

这样直接操作的就是 hive 的表了

2.8 小案例–读取hive数据分析结果写入mysql

import java.util.Properties

import org.apache.spark.sql.SparkSession

object HiveToMysql {
  def main(args: Array[String]): Unit = {
    //直接通过spark集群中的hive client连接hive,不需要jdbc以及hive server
    val spark = SparkSession.builder().appName("hive to mysql").enableHiveSupport().getOrCreate()
    val resultDF = spark.sql("select * from default.customer")

    //一般中间写的处理逻辑都是处理从hive读取的数据,处理完成后写入到mysql

    val mysqlConn = new Properties()
    mysqlConn.setProperty("user","root")
    mysqlConn.setProperty("password","wjt86912572")
    //通过jdbc写入mysql
  resultDF.write.mode("append").jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8", "customer", mysqlConn)

    spark.stop()
  }

}

三、性能优化

3.1 内存中缓存数据

先启动个spark-shell

spark-shell --master spark://bigdata121:7077

要在spark-shell中操作mysql,所以记得自己找个 mysql-connector的jar放到spark的jars目录下

例子:

创建df,从mysql读取表
scala> val mysqDF = spark.read.format("jdbc").option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8").option("user","root").option("password","wjt86912572").option("driver","com.mysql.jdbc.Driver").option("dbtable","customer").load()
mysqDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> mysqDF.show
+---+------+--------------------+
| id|  name|            last_mod|
+---+------+--------------------+
|  1|  neil|2019-07-20 17:09:...|
|  2|  jack|2019-07-20 17:09:...|
|  3|martin|2019-07-20 17:09:...|
|  4|  tony|2019-07-20 17:09:...|
|  5|  eric|2019-07-20 17:09:...|
|  6|  king|2019-07-20 17:42:...|
|  7|   tao|2019-07-20 17:45:...|
+---+------+--------------------+

必须注册成一张表,才可以缓存。
scala> mysqDF.registerTempTable("customer")
warning: there was one deprecation warning; re-run with -deprecation for details

标识这张表可以被缓存,但是现在数据并没有直接缓存
scala> spark.sqlContext.cacheTable("customer")

第一次查询表,从mysql读取数据,并缓存到内存中
scala> spark.sql("select * from customer").show
+---+------+--------------------+
| id|  name|            last_mod|
+---+------+--------------------+
|  1|  neil|2019-07-20 17:09:...|
|  2|  jack|2019-07-20 17:09:...|
|  3|martin|2019-07-20 17:09:...|
|  4|  tony|2019-07-20 17:09:...|
|  5|  eric|2019-07-20 17:09:...|
|  6|  king|2019-07-20 17:42:...|
|  7|   tao|2019-07-20 17:45:...|
+---+------+--------------------+

这一次查询从内存中返回
scala> spark.sql("select * from customer").show
+---+------+--------------------+
| id|  name|            last_mod|
+---+------+--------------------+
|  1|  neil|2019-07-20 17:09:...|
|  2|  jack|2019-07-20 17:09:...|
|  3|martin|2019-07-20 17:09:...|
|  4|  tony|2019-07-20 17:09:...|
|  5|  eric|2019-07-20 17:09:...|
|  6|  king|2019-07-20 17:42:...|
|  7|   tao|2019-07-20 17:45:...|
+---+------+--------------------+

清空缓存
scala> spark.sqlContext.clearCache

3.2 调优相关参数

将数据缓存到内存中的相关优化参数
   spark.sql.inMemoryColumnarStorage.compressed
   默认为 true
   Spark SQL 将会基于统计信息自动地为每一列选择一种压缩编码方式。

   spark.sql.inMemoryColumnarStorage.batchSize
   默认值:10000
   缓存批处理大小。缓存数据时, 较大的批处理大小可以提高内存利用率和压缩率,但同时也会带来 OOM(Out Of Memory)的风险。

其他性能相关的配置选项(不过不推荐手动修改,可能在后续版本自动的自适应修改)
   spark.sql.files.maxPartitionBytes
   默认值:128 MB
   读取文件时单个分区可容纳的最大字节数

   spark.sql.files.openCostInBytes
   默认值:4M
   打开文件的估算成本, 按照同一时间能够扫描的字节数来测量。当往一个分区写入多个文件的时候会使用。高估更好, 这样的话小文件分区将比大文件分区更快 (先被调度)。

spark.sql.autoBroadcastJoinThreshold
   默认值:10M
   用于配置一个表在执行 join 操作时能够广播给所有 worker 节点的最大字节大小。通过将这个值设置为 -1 可以禁用广播。注意,当前数据统计仅支持已经运行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。

spark.sql.shuffle.partitions
   默认值:200
   用于配置 join 或聚合操作混洗(shuffle)数据时使用的分区数。

原创文章,作者:carmelaweatherly,如若转载,请注明出处:https://blog.ytso.com/tech/opensource/192887.html

(0)
上一篇 2021年11月15日 04:28
下一篇 2021年11月15日 04:28

相关推荐

发表回复

登录后才能评论