(1)RDD的介绍
RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变(RDD中的数据,不能增删改),可分区、元素可并行计算的集合。
具有数据流的模型的特点,自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显示的将工作集缓存在内存中。后续的查询能够重用工作集,这极大地提升了查询速度。
RDD可以从 三方面理解:
– 数据集:RDD是数据集合的抽象,是复杂物理介质上存在数据的一种逻辑视图。从外部看RDD的确可以被看待成经过封装,带扩展特性(如容错性)的数据集合。
– 分布式:RDD的数据可能存储在多个节点的磁盘上或者内存中,也就是所谓的多级存储。
– 弹性:虽然 RDD 内部存储的数据是只读的,但是,我们可以去修改(例如通 过 repartition 转换操作)并行计算计算单元的划分结构,也就是分区的数量。
总之:RDD就是一个大集合,将所有的数据都加载到内存中,方便多次进行重用。它的数据可以在多个节点上,并且RDD可以保存在内存中,当如果某个阶段的RDD丢失,不需要重新计算,只需要提取上一次的RDD,在相应的计算即可。
(2)RDD的属性
1)A list of partitions(一组分片,数据集的基本单位)
一个分区通常与一个任务向关联,分区的个数决定了并行的粒度。分区的个数可以在创建RDD的时候指定,如果不指定,那么默认的由节点的cores个数决定。最终每一个分区会被映射成为BlockManager 中的一个Block,而这个Block会被下一个task使用进行计算。
2)A function for computing each split(算子)
每一个RDD都会实现compute,用于分区进行计算
3)A list of dependencies on other RDDs(RDD之间的依赖)
RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据, 而不是对 RDD 的所有分区进行重新计算。
宽依赖和窄依赖:
窄依赖(完全依赖):一个父分区唯一对应一个子分区,例:map操作
宽依赖(部分依赖):一个父分区对应多个子分区,如:reduce、group操作
区分宽依赖和窄依赖:当前这个算子的执行过程中是否有shuffle操作。
4)Optionally a Partitioner for key-value RDDs(分区函数)
当前 Spark 中实现了两种类型的分片函数,一个是基于哈希的 HashPartitioner,另外一个是基于范围的 RangePartitioner。只有对于 key-value 的 RDD,才会有 Partitioner,非 key-value的 RDD 的 Parititioner 的值是 None。Partitioner 函数不但决定了 RDD 本身的分片数量,也决 定了 parent RDD Shuffle 输出时的分片数量。
5)Optionally a list of preferred locations to compute each split on
一个列表,存储存取每个 Partition 的优先位置(preferred location)。按照”移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。而这个列表中就存放着每个分区的优先位置。
(3)RDD的API(相关算子)
RDD编程中有两种中形式:Transformation(转换)和Action(行动)。
Transformation:表示把一个RDD —->RDD。
Action:表示把RDD—-集合或者scala对象。
1)RDD的创建:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext()
//由一个已经存在的 Scala 数据集合创建
val arr=Array(1,2,3,4)
val arr1RDD: RDD[Int] = sc.parallelize(arr)
val arr2RDD: RDD[Int] = sc.makeRDD(arr)
//由外部存储系统的数据创建(HDFS、HBase...)
val HDFSRDD: RDD[String] = sc.textFile("/data/input")
}
}
2)Transformation:
官网:http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
注意:RDD中所有的转换(Transformation)都是延迟加载,也就是说,他们并不是直接计算结果,相反的,他们只是记住这些应用到基础数据集,上的一个转换动作,只有当发生一个要求返回一个Driver动作的时候,这些转换才真正运行。
map()算子:
val HDFSRDD: RDD[String] = sc.textFile("/data/input")
/**
* map 算子,返回一个新的RDD,该RDD由每一个输入元素经过function函数转换后组成
*/
val mapRDD: RDD[(String, Int)] = HDFSRDD.map(ele=>(ele,1))
flatMap()算子:
val arr=Array("hive hbase hadoop","spark hadoop","yarn hdfs")
val lineRDD: RDD[String] = sc.parallelize(arr)
/**
* flagMap:类似于map,但是每一个元素输入的元素可以被
* 映射成为0个或者多个输出的元素(返回的是一个序列,而不是单一的元素)
*/
//返回一个集合hive hbase hadoop spark hadoop yarn hdfs
val wordRDD: RDD[String] = lineRDD.flatMap(line=>line.split("//s+"))
filter()算子:
val arr=Array(1,2,3,4,5)
val arrRDD: RDD[Int] = sc.parallelize(arr)
/**
* filter过滤:返回一个新的RDD,该RDD由经过func函数计算后返回
* 值为true的输入元素组成
*/
val filterRDD: RDD[Int] = arrRDD.filter(num=>num%2==0)
mapPartitions()算子:
val hdfsRDD: RDD[String] = sc.textFile("/data/input")
/**
* mapPartitions与map的唯一区别就是,mapPartitions迭代的是一个分区,
* 而map遍历的每一个元素,mapPartitions参数是一个迭代对象,返回的也是一个迭代对象
*/
val partitionRDD: RDD[String] = hdfsRDD.mapPartitions((x: Iterator[String]) => {
val temp = x.toList.map(line => line + "!")
temp.toIterator
})
mapPartitionsWithIndex()算子:
val hdfsRDD: RDD[String] = sc.textFile("/data/input")
/**
* 第一个参数是分区编号:分区编号是从0开始的不间断的连续编号
* 第二个参数和mapPartitions相同
*/
val partitionRDD: RDD[String] = hdfsRDD.mapPartitionsWithIndex((parnum:Int,x: Iterator[String]) => {
println(parnum) //分区编号
val temp = x.toList.map(line => line + "!")
temp.toIterator
})
sample()算子:
val list=1 to 5000
/**
* sample方法有三个参数:
* withReplacement:代表是否有放回的抽取(false 不放回,true:放回)
* fraction:抽取样本空间占总体的比例,(以分数的形式) 0<=fraction <=1
* seed:随机数生成器,new Random().nextInt(10),不传表示使用系统的
* 注意:我们使用的sample算子,不能保证提供集合大小就恰巧是rdd.size()*fraction,结果大小会上下浮动
* sample在做抽样调查的时候,特别受用
*/
val listRDD: RDD[Int] = sc.parallelize(list)
val sampleRDD: RDD[Int] = listRDD.sample(false,0.2)
println(sampleRDD.count()) //大概是5000*0.2 上下浮动
groupByKey()算子:
val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
/**
* groupByKey,分组
* 建议groupByKey在实践中,能不用就不用,主要因为groupByKey的效率低,
* 因为有大量的数据在网络中传输,而且还没有进行本地的预处理
* 可以使用reduceByKey或者aggregateByKey或者combineByKey代替这个groupByKey
*/
val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
//分组
val groupRDD: RDD[(String, Iterable[Int])] = stuRDD.groupByKey()
//求平均值
val result: RDD[(String, Double)] = groupRDD.map { case (name, score) => {
val avg = score.sum.toDouble / (score.size)
(name, avg)
}
}
reduceByKey算子:
val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
/**
* reduceByKey:在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据
* 集,key 相同的值,都被使用指定的 reduce 函数聚合
* 到一起。和 groupByKey 类似,任务的个数是可以通过
* 第二个可选参数来配置的。
*/
val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
//分组,求总分
val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)
sumRDD.foreach(println) //打印:(hbase,36)(math,18)(hbase,18)
sortByKey()算子:
val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
/**
* sortByKey:在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,
* 返回一个按照 key 进行排序的(K,V)的 RDD
*/
//分组,求总分,排序
val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)
sumRDD.foreach(println) //打印:(hbase,36)(math,18)(hbase,18)
val sortRDD: RDD[(String, Int)] = sumRDD.map(kv=>(kv._2,kv._1)).sortByKey().map(kv=>(kv._2,kv._1))
sortRDD.foreach(println)
sortBy算子:
val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
/**
* sortBy(func,[ascending], [numTasks])
* 与 sortByKey 类似,但是更灵活
* 第一个参数是根据什么排序
* 第二个是怎么排序,true 正序,false 倒序
* 第三个排序后分区数,默认与原 RDD 一样
*/
//分组,求总分,排序
val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)
sumRDD.foreach(println) //打印:(hbase,36)(math,18)(hbase,18)
val sortRDD: RDD[(String, Int)] = sumRDD.sortBy(kv=>kv._2,false,2)
aggregateByKey()算子:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext()
/**
* aggregateByKey(zeroValue)(seqOp,combOp, [numTasks])
* 先按分区聚合再总的聚合,每次要跟初始值交流
* zeroValue:初始值
* seqOp:迭代操作,拿RDD中的每一个元素跟初始值进行合并
* combOp:分区结果的最终合并
* numTasks:分区个数
* aggregate+groupByKey=aggregateByKey
* aggregate对单个值进行RDD,aggregateByKey对(K,V)值进行RDD
*/
//aggregate
val list = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val listRDD: RDD[Int] = sc.parallelize(list)
//求平均值
/**
* seqOp: (U, T) => U
* combOp: (U, U) => U
* u:(Int,Int) 总和,总次数
*/
val result: (Int, Int) = listRDD.aggregate(0, 0)((u: (Int, Int), x: Int) => {
(u._1 + x, u._2 + 1)
}
, (u1: (Int, Int), u2: (Int, Int)) => {
(u1._1 + u2._1, u1._2 + u2._2)
})
println(result._1 / result._2)
//aggregateByKey已经根据(k,v)k 进行分组,以下的操作,是对v进行操作
//以下操作时求平均值
val list1 = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
val stuRDD: RDD[(String, Int)] = sc.parallelize(list1)
val reslutRDD2: RDD[(String, (Int, Int))] = stuRDD.aggregateByKey((0, 0))((x: (Int, Int), y: Int) => {
(x._1 + y, x._2 + 1)
}, (x: (Int, Int), y: (Int, Int)) => {
(x._1 + y._1, x._2 + y._2)
})
reslutRDD2.foreach(kv=>{
val name=kv._1
val avg=kv._2._1.toDouble/kv._2._2
})
}
}
foldLeft()算子:(不是spark的算子,是scala的高级操作)
/**
* foldLeft
* (zeroValue: T) 初值值
* (B, A) => B B是一个元组,B._1 表示累加元素,B._2 表示个数, A 表示下一个元素
*/
//aggregate
val list = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val result: (Int, Int) = list.foldLeft((0,0))((x, y)=>{(x._1+y,x._2+1)})
println(result._1.toDouble/result._2)
combineByKey()算子:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
/**
* combineByKey:
* 合并相同的 key 的值 rdd1.combineByKey(x => x, (a: Int,
* b: Int) => a + b, (m: Int, n: Int) => m + n)
*/
//求平均值
val list1 = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
val listRDD: RDD[(String, Int)] = sc.parallelize(list1)
/**
* createCombiner: V => C,
* mergeValue: (C, V) => C,
* mergeCombiners: (C, C) => C): RDD[(K, C)]
*/
val resultRDD: RDD[(String, (Int, Int))] = listRDD.combineByKey(x => {
(x, 1)
},
(x: (Int, Int), y: Int) => {
(x._1 + y, x._2 + 1)
},
(x: (Int, Int), y: (Int, Int)) => {
(x._1 + y._1, x._2 + y._2)
})
resultRDD.foreach{case (name,(sum,count))=>{
val avg=sum.toDouble/count
println(s"${name}:${avg}")
}}
}
}
连接操作:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val arr1 = Array(1, 2, 4, 5)
val arr1RDD = sc.parallelize(arr1)
val arr2 = Array(4, 5, 6, 7)
val arr2RDD = sc.parallelize(arr2)
//cartesian 笛卡尔积
val cartesianRDD: RDD[(Int, Int)] = arr1RDD.cartesian(arr2RDD)
//union : 连接
val unionRDD: RDD[Int] = arr1RDD.union(arr2RDD)
//subtract,求,差集
val sbutractRDD: RDD[Int] = arr1RDD.subtract(arr2RDD)
//join
val list1 = List(("a", 1), ("b", 2), ("c", 3))
val list1RDD = sc.parallelize(list1)
val list2 = List(("a", "zs"), ("b", "sl"))
val list2RDD = sc.parallelize(list2)
/**
* 根据元组中的key进行join 操作,相同的key向连接
* 返回的是RDD[(String, (Int, String))] (key,连接结果)
*/
val joinRDD: RDD[(String, (Int, String))] = list1RDD.join(list2RDD)
//cogroup
/**
* (String key ,
* (Iterable[Int] arr1中的相应的key所有value的集合
* , Iterable[String])) arr2中的相应的key所有value的集合
*/
val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[String]))] = list1RDD.cogroup(list2RDD)
}
}
分区操作:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val hdfsRDD: RDD[String] = sc.textFile("/data/word.txt")
/**
* 表示在执行了filter操作之后,由于大量的数据被过滤,导致之前设定的分区task个数,
* 处理剩下的数据导致资源浪费,为了合理高效的利用资源,
* 可以对task重新定义,在coalesce方法中的分区个数一定要小于之前设置的分区个数。
*/
hdfsRDD.coalesce(2)
//打乱数据,重新分区,分区规则为随机分区
hdfsRDD.repartition(3)
//自定义分区规则(注意,只在有key-value的RDD中可以使用)
var arr1 = Array(("a", 1), ("a", 2), ("c", 1), ("b", 2), ("d", 2)
("b", 2), ("e", 2)
, ("b", 2)
, ("f", 2), ("g", 2), ("h", 2))
val arrRDD: RDD[(String, Int)] = sc.parallelize(arr1,4)
arrRDD.partitionBy(new MyPartitioner(3))
}
}
class MyPartitioner(val numPTN:Int) extends Partitioner{
//分区个数
override def numPartitions: Int = numPTN
//分区规则
override def getPartition(key: Any): Int = {
val num=key.hashCode()&Integer.MAX_VALUE%numPTN
return num
}
}
总结:
– Transformation返回的仍然是一个RDD
– 它使用了链式调用的设计模式,对一个 RDD 进行计 算后,变换成另外一个 RDD,然后这个 RDD 又可以进行另外一次转换。这个过程是分布式的。
3)Action:
常见操作:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val list = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
val listRDD: RDD[(String, Int)] = sc.parallelize(list)
//action rdd ---map
listRDD.reduceByKeyLocally((x,y)=>x+y)
//调用collect的目的是:触发所有的计算,最终收集当前这个调用者RDD的所有数据,返回到客户端,如果数据量比较大,谨慎使用
listRDD.collect()
//统计RDD中有多少记录
listRDD.count()
//取出RDD中的第一条记录
listRDD.first()
//取出RDD前几条记录
listRDD.take(5)
//随机采样
listRDD.takeSample(false,20)
//按照某种格式,排序后的前几条
listRDD.top(50)
//按照升序或者降序,取相应的条数的记录(其中的元素必须继承Ordered)
listRDD.takeOrdered(3)
//统计每一个key中的value有多少个
listRDD.countByKey()
//统计有多少个元素
listRDD.countByValue()
//遍历RDD中每一个元素
listRDD.foreach(kv=>{})
//分区遍历RDD中的元素
listRDD.foreachPartition(kv=>{})
//将RDD的结果,保存到相应的文件系统中(注意这个目录一定是不存在的目录)
listRDD.saveAsTextFile("/data/output")
}
}
总结:Action返回值不是一个RDD。它要么是一个scala的集合,要么是一个值,要么是空。最终返回到Driver程序,或者把RDD写入到文件系统中。
原创文章,作者:carmelaweatherly,如若转载,请注明出处:https://blog.ytso.com/tech/opensource/195862.html