学习任何spark知识点之前请先正确理解spark,可以参考:正确理解spark
本文详细介绍了spark key-value类型的rdd java api
一、key-value类型的RDD的创建方式
1、sparkContext.parallelizePairs
JavaPairRDD<String, Integer> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3), new Tuple2("kkk", 3))); //结果:[(test,3), (kkk,3)] System.out.println("javaPairRDD = " + javaPairRDD.collect());
2、keyBy的方式
public class User implements Serializable { private String userId; private Integer amount; public User(String userId, Integer amount) { this.userId = userId; this.amount = amount; } @Override public String toString() { return "User{" + "userId='" + userId + '/'' + ", amount=" + amount + '}'; } } JavaRDD<User> userJavaRDD = sc.parallelize(Arrays.asList(new User("u1", 20))); JavaPairRDD<String, User> userJavaPairRDD = userJavaRDD.keyBy(new Function<User, String>() { @Override public String call(User user) throws Exception { return user.getUserId(); } }); //结果:[(u1,User{userId='u1', amount=20})] System.out.println("userJavaPairRDD = " + userJavaPairRDD.collect());
3、zip的方式
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); //两个rdd zip也是创建key-value类型RDD的一种方式 JavaPairRDD<Integer, Integer> zipPairRDD = rdd.zip(rdd); //结果:[(1,1), (1,1), (2,2), (3,3), (5,5), (8,8), (13,13)] System.out.println("zipPairRDD = " + zipPairRDD.collect());
4、groupBy的方式
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); Function<Integer, Boolean> isEven = new Function<Integer, Boolean>() { @Override public Boolean call(Integer x) throws Exception { return x % 2 == 0; } }; //将偶数和奇数分组,生成key-value类型的RDD JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isEven); //结果:[(false,[1, 1, 3, 5, 13]), (true,[2, 8])] System.out.println("oddsAndEvens = " + oddsAndEvens.collect()); //结果:1 System.out.println("oddsAndEvens.partitions.size = " + oddsAndEvens.partitions().size()); oddsAndEvens = rdd.groupBy(isEven, 2); //结果:[(false,[1, 1, 3, 5, 13]), (true,[2, 8])] System.out.println("oddsAndEvens = " + oddsAndEvens.collect()); //结果:2 System.out.println("oddsAndEvens.partitions.size = " + oddsAndEvens.partitions().size());
二、combineByKey
JavaPairRDD<String, Integer> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2), new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2); //当在一个分区中遇到新的key的时候,对这个key对应的value应用这个函数 Function<Integer, Tuple2<Integer, Integer>> createCombiner = new Function<Integer, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Integer value) throws Exception { return new Tuple2<>(value, 1); } }; //当在一个分区中遇到已经应用过上面createCombiner函数的key的时候,对这个key对应的value应用这个函数 Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>> mergeValue = new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc, Integer value) throws Exception { return new Tuple2<>(acc._1() + value, acc._2() + 1); } }; //当需要对不同分区的数据进行聚合的时候应用这个函数 Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> mergeCombiners = new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception { return new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2()); } }; JavaPairRDD<String, Tuple2<Integer, Integer>> combineByKeyRDD = javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners); //结果:[(coffee,(12,3)), (panda,(3,1))] System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());
combineByKey的数据流如下:
对于combineByKey的原理讲解详细见: spark core RDD api原理详解
三、aggregateByKey
JavaPairRDD<String, Tuple2<Integer, Integer>> aggregateByKeyRDD = javaPairRDD.aggregateByKey(new Tuple2<>(0, 0), mergeValue, mergeCombiners); //结果:[(coffee,(12,3)), (panda,(3,1))] System.out.println("aggregateByKeyRDD = " + aggregateByKeyRDD.collect()); //aggregateByKey是由combineByKey实现的,上面的aggregateByKey就是等于下面的combineByKeyRDD Function<Integer, Tuple2<Integer, Integer>> createCombinerAggregateByKey = new Function<Integer, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Integer value) throws Exception { return mergeValue.call(new Tuple2<>(0, 0), value); } }; //结果是: [(coffee,(12,3)), (panda,(3,1))] System.out.println(javaPairRDD.combineByKey(createCombinerAggregateByKey, mergeValue, mergeCombiners).collect());
四、reduceByKey
JavaPairRDD<String, Integer> reduceByKeyRDD = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer value1, Integer value2) throws Exception { return value1 + value2; } }); //结果:[(coffee,12), (panda,3)] System.out.println("reduceByKeyRDD = " + reduceByKeyRDD.collect()); //reduceByKey底层也是combineByKey实现的,上面的reduceByKey等于下面的combineByKey Function<Integer, Integer> createCombinerReduce = new Function<Integer, Integer>() { @Override public Integer call(Integer integer) throws Exception { return integer; } }; Function2<Integer, Integer, Integer> mergeValueReduce = new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }; //结果:[(coffee,12), (panda,3)] System.out.println(javaPairRDD.combineByKey(createCombinerReduce, mergeValueReduce, mergeValueReduce).collect());
五、foldByKey
JavaPairRDD<String, Integer> foldByKeyRDD = javaPairRDD.foldByKey(0, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); //结果:[(coffee,12), (panda,3)] System.out.println("foldByKeyRDD = " + foldByKeyRDD.collect()); //foldByKey底层也是combineByKey实现的,上面的foldByKey等于下面的combineByKey Function2<Integer, Integer, Integer> mergeValueFold = new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }; Function<Integer, Integer> createCombinerFold = new Function<Integer, Integer>() { @Override public Integer call(Integer integer) throws Exception { return mergeValueFold.call(0, integer); } }; //结果:[(coffee,12), (panda,3)] System.out.println(javaPairRDD.combineByKey(createCombinerFold, mergeValueFold, mergeValueFold).collect());
六、groupByKey
JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = javaPairRDD.groupByKey(); //结果:[(coffee,[1, 2, 9]), (panda,[3])] System.out.println("groupByKeyRDD = " + groupByKeyRDD.collect()); //groupByKey底层也是combineByKey实现的,上面的groupByKey等于下面的combineByKey Function<Integer, List<Integer>> createCombinerGroup = new Function<Integer, List<Integer>>() { @Override public List<Integer> call(Integer integer) throws Exception { List<Integer> list = new ArrayList<>(); list.add(integer); return list; } }; Function2<List<Integer>, Integer, List<Integer>> mergeValueGroup = new Function2<List<Integer>, Integer, List<Integer>>() { @Override public List<Integer> call(List<Integer> integers, Integer integer) throws Exception { integers.add(integer); return integers; } }; Function2<List<Integer>, List<Integer>, List<Integer>> mergeCombinersGroup = new Function2<List<Integer>, List<Integer>, List<Integer>>() { @Override public List<Integer> call(List<Integer> integers, List<Integer> integers2) throws Exception { integers.addAll(integers2); return integers; } }; //结果:[(coffee,[1, 2, 9]), (panda,[3])] System.out.println(javaPairRDD.combineByKey(createCombinerGroup, mergeValueGroup, mergeCombinersGroup).collect());
对于api原理性的东西很难用文档说明清楚,如果想更深入,更透彻的理解api的原理,可以参考: spark core RDD api原理详解
原创文章,作者:3628473679,如若转载,请注明出处:https://blog.ytso.com/195055.html