学习spark任何技术之前,请先正确理解spark,可以参考:正确理解spark
以下对RDD的三种创建方式、单类型RDD基本的transformation api、采样Api以及pipe操作进行了java api方面的阐述
一、RDD的三种创建方式
-
从稳定的文件存储系统中创建RDD,比如local fileSystem或者hdfs等,如下:
//从hdfs文件中创建 JavaRDD<String> textFileRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt"); //从本地文件系统的文件中,注意file:后面肯定是至少三个///,四个也行,不能是两个 //如果指定第二个参数的话,表示创建的RDD的最小的分区数,如果文件分块的数量大于指定的分区 //数的话则已文件的分块数量为准 JavaRDD<String> textFileRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt" 2 );
2. 可以经过transformation api从一个已经存在的RDD上创建一个新的RDD,以下是map这个转换api
JavaRDD<String> mapRDD = textFileRDD.map(new Function<String, String>() { @Override public String call(String s) throws Exception { return s + "test"; } }); System.out.println("mapRDD = " + mapRDD.collect());
3. 从一个内存中的列表数据创建一个RDD,可以指定RDD的分区数,如果不指定的话,则取所有Executor的所有cores数量
//创建一个单类型的JavaRDD JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 3, 4), 2); System.out.println("integerJavaRDD = " + integerJavaRDD.glom().collect()); //创建一个单类型且类型为Double的JavaRDD JavaDoubleRDD doubleJavaDoubleRDD = sc.parallelizeDoubles(Arrays.asList(2.0, 3.3, 5.6)); System.out.println("doubleJavaDoubleRDD = " + doubleJavaDoubleRDD.collect()); //创建一个key-value类型的RDD import scala.Tuple2; JavaPairRDD<String, Integer> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3), new Tuple2("kkk", 3))); System.out.println("javaPairRDD = " + javaPairRDD.collect());
注:对于第三种情况,scala中还提供了makeRDD api,这个api可以指定创建RDD每一个分区所在的机器,这个api的原理详见spark core RDD scala api中
二、单类型RDD基本的transformation api
先基于内存中的数据创建一个RDD
JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 3), 2);
-
map操作,表示对integerJavaRDD的每一个元素应用我们自定义的函数接口,如下是将每一个元素加1:
JavaRDD<Integer> mapRDD = integerJavaRDD.map(new Function<Integer, Integer>() { @Override public Integer call(Integer element) throws Exception { return element + 1; } }); //结果:[2, 3, 4, 4] System.out.println("mapRDD = " + mapRDD.collect());
需要注意的是,map操作可以返回与RDD不同类型的数据,如下,返回一个自定义的User对象:
public class User implements Serializable { private String userId; private Integer amount; public User(String userId, Integer amount) { this.userId = userId; this.amount = amount; } //getter setter.... @Override public String toString() { return "User{" + "userId='" + userId + '/'' + ", amount=" + amount + '}'; } } JavaRDD<User> userJavaRDD = integerJavaRDD.map(new Function<Integer, User>() { @Override public User call(Integer element) throws Exception { if (element < 3) { return new User("小于3", 22); } else { return new User("大于3", 23); } } }); //结果:[User{userId='小于3', amount=22}, User{userId='小于3', amount=22}, User{userId='大于3', amount=23}, User{userId='大于3', amount=23}] System.out.println("userJavaRDD = " + userJavaRDD.collect());
2. flatMap操作,对integerJavaRDD的每一个元素应用我们自定义的FlatMapFunction,这个函数的输出是一个数据列表,flatMap会对这些输出的数据列表进行展平
JavaRDD<Integer> flatMapJavaRDD = integerJavaRDD.flatMap(new FlatMapFunction<Integer, Integer>() { @Override public Iterator<Integer> call(Integer element) throws Exception { //输出一个list,这个list里的元素是0到element List<Integer> list = new ArrayList<>(); int i = 0; while (i <= element) { list.add(i); i++; } return list.iterator(); } }); //结果: [0, 1, 0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 3] System.out.println("flatMapJavaRDD = " + flatMapJavaRDD.collect());
3. filter操作,对integerJavaRDD的每一个元素应用我们自定义的过滤函数,过滤掉我们不需要的元素,如下,过滤掉不等于1的元素:
JavaRDD<Integer> filterJavaRDD = integerJavaRDD.filter(new Function<Integer, Boolean>() { @Override public Boolean call(Integer integer) throws Exception { return integer != 1; } }); //结果为:[2, 3, 3] System.out.println("filterJavaRDD = " + filterJavaRDD.collect());
4. glom操作,查看integerJavaRDD每一个分区对应的元素数据
JavaRDD<List<Integer>> glomRDD = integerJavaRDD.glom(); //结果: [[1, 2], [3, 3]], 说明integerJavaRDD有两个分区,第一个分区中有数据1和2,第二个分区中有数据3和3 System.out.println("glomRDD = " + glomRDD.collect());
5. mapPartitions操作,对integerJavaRDD的每一个分区的数据应用我们自定义的函数接口方法,假设我们需要为每一个元素加上一个初始值,而这个初始值的获取又是非常耗时的,这个时候用mapPartitions会有非常大的优势,如下:
//这是一个初始值获取的方法,是一个比较耗时的方法 public static Integer getInitNumber(String source) { System.out.println("get init number from " + source + ", may be take much time........"); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return 1; } JavaRDD<Integer> mapPartitionTestRDD = integerJavaRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() { @Override public Iterator<Integer> call(Iterator<Integer> integerIterator) throws Exception { //每一个分区获取一次初始值,integerJavaRDD有两个分区,那么会调用两次getInitNumber方法 //所以对应需要初始化的比较耗时的操作,比如初始化数据库的连接等,一般都是用mapPartitions来为对每一个分区初始化一次,而不要去使用map操作 Integer initNumber = getInitNumber("mapPartitions"); List<Integer> list = new ArrayList<>(); while (integerIterator.hasNext()) { list.add(integerIterator.next() + initNumber); } return list.iterator(); } }); //结果为: [2, 3, 4, 4] System.out.println("mapPartitionTestRDD = " + mapPartitionTestRDD.collect()); JavaRDD<Integer> mapInitNumberRDD = integerJavaRDD.map(new Function<Integer, Integer>() { @Override public Integer call(Integer integer) throws Exception { //遍历每一个元素的时候都会去获取初始值,这个integerJavaRDD含有4个元素,那么这个getInitNumber方法会被调用4次,严重的影响了时间,不如mapPartitions性能好 Integer initNumber = getInitNumber("map"); return integer + initNumber; } }); //结果为:[2, 3, 4, 4] System.out.println("mapInitNumberRDD = " + mapInitNumberRDD.collect());
6. mapPartitionsWithIndex操作,对integerJavaRDD的每一个分区的数据应用我们自定义的函数接口方法,在应用函数接口方法的时候带上了分区信息,即知道你当前处理的是第几个分区的数据
JavaRDD<Integer> mapPartitionWithIndex = integerJavaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<Integer>>() { @Override public Iterator<Integer> call(Integer partitionId, Iterator<Integer> integerIterator) throws Exception { //partitionId表示当前处理的第几个分区的信息 System.out.println("partition id = " + partitionId); List<Integer> list = new ArrayList<>(); while (integerIterator.hasNext()) { list.add(integerIterator.next() + partitionId); } return list.iterator(); } }, false); //结果 [1, 2, 4, 4] System.out.println("mapPartitionWithIndex = " + mapPartitionWithIndex.collect());
三、采样Api
先基于内存中的数据创建一个RDD
JavaRDD<Integer> listRDD = sc.parallelize(Arrays.asList(1, 2, 3, 3), 2);
-
sample
//第一个参数为withReplacement //如果withReplacement=true的话表示有放回的抽样,采用泊松抽样算法实现 //如果withReplacement=false的话表示无放回的抽样,采用伯努利抽样算法实现 //第二个参数为:fraction,表示每一个元素被抽取为样本的概率,并不是表示需要抽取的数据量的因子 //比如从100个数据中抽样,fraction=0.2,并不是表示需要抽取100 * 0.2 = 20个数据, //而是表示100个元素的被抽取为样本概率为0.2;样本的大小并不是固定的,而是服从二项分布 //当withReplacement=true的时候fraction>=0 //当withReplacement=false的时候 0 < fraction < 1 //第三个参数为:reed表示生成随机数的种子,即根据这个reed为rdd的每一个分区生成一个随机种子 JavaRDD<Integer> sampleRDD = listRDD.sample(false, 0.5, 100); //结果: [1, 3] System.out.println("sampleRDD = " + sampleRDD.collect());
2. randomSplit
//按照权重对RDD进行随机抽样切分,有几个权重就切分成几个RDD //随机抽样采用伯努利抽样算法实现, 以下是有两个权重,则会切成两个RDD JavaRDD<Integer>[] splitRDDs = listRDD.randomSplit(new double[]{0.4, 0.6}); //结果为2 System.out.println("splitRDDs.length = " + splitRDDs.length); //结果为[2, 3] 结果是不定的 System.out.println("splitRDD(0) = " + splitRDDs[0].collect()); //结果为[1, 3] 结果是不定的 System.out.println("splitRDD(1) = " + splitRDDs[1].collect());
3. takeSample
//随机抽样指定数量的样本数据 //第一个参数为withReplacement //如果withReplacement=true的话表示有放回的抽样,采用泊松抽样算法实现 //如果withReplacement=false的话表示无放回的抽样,采用伯努利抽样算法实现 //第二个参数指定多少,则返回多少个样本数 结果为[2, 3] System.out.println(listRDD.takeSample(false, 2));
4. 分层采样,对key-value类型的RDD进行采样
//创建一个key value类型的RDD import scala.Tuple2; JavaPairRDD<String, Integer> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3), new Tuple2("kkk", 3), new Tuple2("kkk", 3))); //定义每一个key的采样因子 Map<String, Double> fractions = new HashMap<>(); fractions.put("test", 0.5); fractions.put("kkk", 0.4); //对每一个key进行采样 //结果为 [(test,3), (kkk,3)] //sampleByKey 并不对过滤全量数据,因此只得到近似值 System.out.println(javaPairRDD.sampleByKey(true, fractions).collect()); //结果为 [(test,3), (kkk,3)] //sampleByKeyExtra 会对全量数据做采样计算,因此耗费大量的计算资源,但是结果会更准确。 System.out.println(javaPairRDD.sampleByKeyExact(true, fractions).collect());
抽样的原理详细可以参考:spark core RDD api。这些原理性的东西用文字不太好表述
四、pipe,表示在RDD执行流中的某一步执行其他的脚本,比如python或者shell脚本等
JavaRDD<String> dataRDD = sc.parallelize(Arrays.asList("hi", "hello", "how", "are", "you"), 2); //启动echo.py需要的环境变量 Map<String, String> env = new HashMap<>(); env.put("env", "envtest"); List<String> commands = new ArrayList<>(); commands.add("python"); //如果是在真实的spark集群中,那么要求echo.py在集群的每一台机器的同一个目录下面都要有 commands.add("/Users/tangweiqun/spark/source/spark-course/spark-rdd-java/src/main/resources/echo.py"); JavaRDD<String> result = dataRDD.pipe(commands, env); //结果为: [slave1-hi-envtest, slave1-hello-envtest, slave1-how-envtest, slave1-are-envtest, slave1-you-envtest] System.out.println(result.collect());
echo.py的内容如下:
import sys import os #input = "test" input = sys.stdin env_keys = os.environ.keys() env = "" if "env" in env_keys: env = os.environ["env"] for ele in input: output = "slave1-" + ele.strip('/n') + "-" + env print (output) input.close
对于pipe的原理,以及怎么实现的,参考:spark core RDD api,这个里面还清楚的讲述了怎么消除手工将脚本拷贝到每一台机器中的工作
系统学习spark:
1、[老汤] Spark 2.x 之精讲Spark Core:https://edu.51cto.com/sd/88429
2、[老汤]Spark 2.x 之精讲Spark SQL专题:https://edu.51cto.com/sd/16f3d
3、[老汤]Scala内功修炼系列专题:https://edu.51cto.com/sd/8e85b
4、[老汤]Spark 2.x之精讲Spark Streamig:https://edu.51cto.com/sd/8c525
5、[老汤]Spark 2.x精讲套餐:https://edu.51cto.com/sd/ff9a4
6、从Scala到Spark 2.x专题:https://edu.51cto.com/sd/d72af
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/194424.html