MapReduce分组详解大数据

  •  分组:相同key的value进行分组

 例子:如下输入输出,右边的第一列没有重复值,第二列取得是当第一列相同时第二例取最大值

          MapReduce分组详解大数据

分析:首先确定<k3,v3>,k3的选择两种方式,

方法1.前两列都作为k3

方法2.两列分别是k3和v3,此种情况的k2和v2分别是那些,第一列为k2,第二列为v2,但是最后如何无法转化为k3,v3呢,思路是从v2s中取值最大的,此种情况不能取值。

第一部分:方法二达到任务目的

(1)自定义Mapper

 1 private static class MyMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{ 
 2     IntWritable k2= new IntWritable(); 
 3     IntWritable v2= new IntWritable(); 
 4     @Override 
 5     protected void map(LongWritable key, Text value, 
 6             Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context) 
 7             throws IOException, InterruptedException { 
 8            String[] splited = value.toString().split("/t"); 
 9            k2.set(Integer.parseInt(splited[0])); 
10            v2.set(Integer.parseInt(splited[1])); 
11            context.write(k2, v2); 
12     } 
13 }

(2)自定义Reduce

//按照k2進行排序,分組(分为3各组,reduce函数被调用3次,分几组调用几次)
//分组为3-{3,2,1}, 2-{2,1},1-{1}

 1 private static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{ 
 2     IntWritable v3 = new IntWritable(); 
 3     @Override 
 4     protected void reduce(IntWritable k2, Iterable<IntWritable> v2s, 
 5             Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context) 
 6             throws IOException, InterruptedException { 
 7         int max=Integer.MIN_VALUE; 
 8         for (IntWritable v2 : v2s) { 
 9             if (v2.get()>max) { 
10                 max=v2.get(); 
11             } 
12         } 
13         //每个组求得一个最大值可得到结果的序列 
14         v3.set(max); 
15         context.write(k2, v3); 
16     } 
17 }

(3)组合MapReduce

 1 public static void main(String[] args) throws Exception { 
 2     Job job = Job.getInstance(new Configuration(), GroupTest.class.getSimpleName()); 
 3     job.setJarByClass(GroupTest.class); 
 4     //1.自定义输入路径 
 5     FileInputFormat.setInputPaths(job, new Path(args[0])); 
 6     //2.自定义mapper 
 7     //job.setInputFormatClass(TextInputFormat.class); 
 8     job.setMapperClass(MyMapper.class); 
 9     //job.setMapOutputKeyClass(Text.class); 
10     //job.setMapOutputValueClass(TrafficWritable.class); 
11      
12     //3.自定义reduce 
13     job.setReducerClass(MyReducer.class); 
14     job.setOutputKeyClass(IntWritable.class); 
15     job.setOutputValueClass(IntWritable.class); 
16     //4.自定义输出路径 
17     FileOutputFormat.setOutputPath(job, new Path(args[1])); 
18     //job.setOutputFormatClass(TextOutputFormat.class);//对输出的数据格式化并写入磁盘 
19      
20     job.waitForCompletion(true); 
21 }

由此,完整的代码如下:

MapReduce分组详解大数据

 1 package Mapreduce; 
 2  
 3 import java.io.IOException; 
 4  
 5 import org.apache.hadoop.conf.Configuration; 
 6 import org.apache.hadoop.fs.Path; 
 7 import org.apache.hadoop.io.IntWritable; 
 8 import org.apache.hadoop.io.LongWritable; 
 9 import org.apache.hadoop.io.Text; 
10 import org.apache.hadoop.mapreduce.Job; 
11 import org.apache.hadoop.mapreduce.Mapper; 
12 import org.apache.hadoop.mapreduce.Reducer; 
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
15  
16 public class GroupTest { 
17 public static void main(String[] args) throws Exception { 
18     Job job = Job.getInstance(new Configuration(), GroupTest.class.getSimpleName()); 
19     job.setJarByClass(GroupTest.class); 
20     //1.自定义输入路径 
21     FileInputFormat.setInputPaths(job, new Path(args[0])); 
22     //2.自定义mapper 
23     //job.setInputFormatClass(TextInputFormat.class); 
24     job.setMapperClass(MyMapper.class); 
25     //job.setMapOutputKeyClass(Text.class); 
26     //job.setMapOutputValueClass(TrafficWritable.class); 
27      
28     //3.自定义reduce 
29     job.setReducerClass(MyReducer.class); 
30     job.setOutputKeyClass(IntWritable.class); 
31     job.setOutputValueClass(IntWritable.class); 
32     //4.自定义输出路径 
33     FileOutputFormat.setOutputPath(job, new Path(args[1])); 
34     //job.setOutputFormatClass(TextOutputFormat.class);//对输出的数据格式化并写入磁盘 
35      
36     job.waitForCompletion(true); 
37 } 
38 private static class MyMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{ 
39     IntWritable k2= new IntWritable(); 
40     IntWritable v2= new IntWritable(); 
41     @Override 
42     protected void map(LongWritable key, Text value, 
43             Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context) 
44             throws IOException, InterruptedException { 
45            String[] splited = value.toString().split("/t"); 
46            k2.set(Integer.parseInt(splited[0])); 
47            v2.set(Integer.parseInt(splited[1])); 
48            context.write(k2, v2); 
49     } 
50 } 
51 //按照k2進行排序,分組(分为3各组,reduce函数被调用3次,分几组调用几次) 
52 //分组为3-{3,2,1}, 2-{2,1},1-{1} 
53 private static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{ 
54     IntWritable v3 = new IntWritable(); 
55     @Override 
56     protected void reduce(IntWritable k2, Iterable<IntWritable> v2s, 
57             Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context) 
58             throws IOException, InterruptedException { 
59         int max=Integer.MIN_VALUE; 
60         for (IntWritable v2 : v2s) { 
61             if (v2.get()>max) { 
62                 max=v2.get(); 
63             } 
64         } 
65         //每个组求得一个最大值可得到结果的序列 
66         v3.set(max); 
67         context.write(k2, v3); 
68     } 
69 } 
70 }

最值得MapReduce代码

(4)测试代码运行结果

  •   [[email protected] filecontent]# hadoop jar GroupTest.jar /neusoft/twoint  /out9 
  •   [[email protected] filecontent]# hadoop jar -text  /out9/part-r-00000
  •   [[email protected] filecontent]# hadoop dfs -text  /out9/part-r-00000  

       MapReduce分组详解大数据

第二部分:方法一达到任务目的

      前两列都作为k3,无v3,由此类推,k2也是前两列

      但是如果采用默认分组的话,上述数据集分为6组,无法达到同样的数值取得最大值的目的。

      由此,利用Mapreduce的自定义分组规则,使得第一列相同的数值可以在一个组里面,从而正确的分组。

      MapReduce提供了job.setGroupingComparatorClass(cls);其中cls是自定义分组的类

      MapReduce分组详解大数据

      (1) 从源代码可知,该类需要继承RawComparator类,自定义分组代码如下:

 1 //分组比较--自定义分组 
 2     private static class MyGroupingComparator implements RawComparator { 
 3         public int compare(Object o1, Object o2) { 
 4             return 0;//默认的比较方法 
 5         } 
 6         //byte[] b1 表示第一个参数的输入字节表示,byte[] b2表示第二个参数的输入字节表示 
 7         //b1 The first byte array. 第一个字节数组, 
 8         //b1表示前8个字节,b2表示后8个字节,字节是按次序依次比较的 
 9         //s1 The position index in b1. The object under comparison's starting index.第一列开始位置 
10         //l1 The length of the object in b1.第一列长度 ,在这里表示长度8 
11         //提供的数据集中的k2一共48个字节,k2的每一行的TwoInt类型表示8字节(t1和t2分别为4字节) 
12         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
13             //compareBytes是按字节比较的方法,其中k2表示的是两列,第一列比较,第二例不比较 
14             //第一个字节数组的前四个字节和第二个字节数组的前四个字节比较 
15             //{3,3},{3,2},{3,1},{2,2},{2,1},{1,1} 
16             //比较上述六组的每组的第一个数字,也就是比较twoint中得t1数值 
17             //现在就根据t1可分成3个组了{3,(3,2,1)}{2,(2,1)}{1,1} 
18             //之后再从v2中取出最大值 
19             return WritableComparator.compareBytes(b1, s1, l1-4, b2, s2, l2-4); 
20         } 
21  
22     }

      (2)主函数中调用      

//当没有下面的自定义分组的话,会调用k2的compareto方法执行k2的比较,如果自定义了分组类则使用自定义分组类 
job.setGroupingComparatorClass(MyGroupingComparator.class);

     (3)根据比较函数个字段的含义,可以得到v2的类型为intwritable,而不是nullwritable,v2是由第二列的数组成的集合

      Mapper函数如下:

 1 private static class MyMapper extends 
 2             Mapper<LongWritable, Text, TwoInt, IntWritable> { 
 3         //这里的v2需要改为IntWritable而不是nullwritable 
 4         TwoInt K2 = new TwoInt(); 
 5         IntWritable v2= new IntWritable(); 
 6         @Override 
 7         protected void map(LongWritable key, Text value, 
 8                 Mapper<LongWritable, Text, TwoInt, IntWritable>.Context context) 
 9                 throws IOException, InterruptedException { 
10             String[] splited = value.toString().split("/t"); 
11             K2.set(Integer.parseInt(splited[0]), Integer.parseInt(splited[1])); 
12             v2.set(Integer.parseInt(splited[1])); //要比较第二列,需要将第二列的值赋值为v2 
13             context.write(K2, v2); 
14         } 
15     }

     (4)k3和v3的类型为reduce输出的类型,均为intwritable类型,但是如何根据得到的v2去统计其中相同key的value中得最大值呢?

 1 private static class MyReducer extends 
 2             Reducer<TwoInt, IntWritable, IntWritable, IntWritable> {//k2,v2s,k3,v3 
 3         IntWritable k3 = new IntWritable(); 
 4         IntWritable v3 = new IntWritable(); 
 5         @Override 
 6         protected void reduce( 
 7                 TwoInt k2, 
 8                 Iterable<IntWritable> v2s, 
 9                 Reducer<TwoInt, IntWritable, IntWritable, IntWritable>.Context context) 
10                 throws IOException, InterruptedException { 
11             int max=Integer.MIN_VALUE; 
12             for (IntWritable v2 : v2s) { 
13                 if (v2.get()>max) { 
14                     max=v2.get(); 
15                 } 
16             } 
17             //每个组求得一个最大值可得到结果的序列 
18             v3.set(max); 
19             k3.set(k2.t1);//k2的第一列作为k3,因为k2为Twoint类型 
20             context.write(k3,v3); 
21         } 
22     }

最终的代码如下:

MapReduce分组详解大数据

  1 package Mapreduce; 
  2  
  3 import java.io.DataInput; 
  4 import java.io.DataOutput; 
  5 import java.io.IOException; 
  6  
  7 import org.apache.hadoop.conf.Configuration; 
  8 import org.apache.hadoop.fs.Path; 
  9 import org.apache.hadoop.io.IntWritable; 
 10 import org.apache.hadoop.io.LongWritable; 
 11 import org.apache.hadoop.io.NullWritable; 
 12 import org.apache.hadoop.io.RawComparator; 
 13 import org.apache.hadoop.io.Text; 
 14 import org.apache.hadoop.io.WritableComparable; 
 15 import org.apache.hadoop.io.WritableComparator; 
 16 import org.apache.hadoop.mapreduce.Job; 
 17 import org.apache.hadoop.mapreduce.Mapper; 
 18 import org.apache.hadoop.mapreduce.Reducer; 
 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
 20 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
 21  
 22  
 23 public class Group2Test { 
 24     public static void main(String[] args) throws Exception { 
 25         Job job = Job.getInstance(new Configuration(), 
 26                 Group2Test.class.getSimpleName()); 
 27         job.setJarByClass(Group2Test.class); 
 28         // 1.自定义输入路径 
 29         FileInputFormat.setInputPaths(job, new Path(args[0])); 
 30         // 2.自定义mapper 
 31         job.setMapperClass(MyMapper.class); 
 32         //这里的k2,v2和k3,v3不同,需要显式定义k2和v2类型 
 33         job.setMapOutputKeyClass(TwoInt.class);   
 34         job.setMapOutputValueClass(IntWritable.class); 
 35  
 36         //当没有下面的自定义分组的话,会调用k2的compareto方法执行k2的比较,如果自定义了分组类则使用自定义分组类 
 37         job.setGroupingComparatorClass(MyGroupingComparator.class); 
 38  
 39         // 3.自定义reduce 
 40         job.setReducerClass(MyReducer.class); 
 41         job.setOutputKeyClass(IntWritable.class); 
 42         job.setOutputValueClass(IntWritable.class); 
 43         // 4.自定义输出路径 
 44         FileOutputFormat.setOutputPath(job, new Path(args[1])); 
 45  
 46         job.waitForCompletion(true); 
 47     } 
 48     //分组比较--自定义分组 
 49     private static class MyGroupingComparator implements RawComparator { 
 50         public int compare(Object o1, Object o2) { 
 51             return 0;//默认的比较方法 
 52         } 
 53         //byte[] b1 表示第一个参数的输入字节表示,byte[] b2表示第二个参数的输入字节表示 
 54         //b1 The first byte array. 第一个字节数组, 
 55         //b1表示前8个字节,b2表示后8个字节,字节是按次序依次比较的 
 56         //s1 The position index in b1. The object under comparison's starting index.第一列开始位置 
 57         //l1 The length of the object in b1.第一列长度 ,在这里表示长度8 
 58         //提供的数据集中的k2一共48个字节,k2的每一行的TwoInt类型表示8字节(t1和t2分别为4字节) 
 59         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
 60             //compareBytes是按字节比较的方法,其中k2表示的是两列,第一列比较,第二例不比较 
 61             //第一个字节数组的前四个字节和第二个字节数组的前四个字节比较 
 62             //{3,3},{3,2},{3,1},{2,2},{2,1},{1,1} 
 63             //比较上述六组的每组的第一个数字,也就是比较twoint中得t1数值 
 64             //现在就根据t1可分成3个组了{3,(3,2,1)}{2,(2,1)}{1,1} 
 65             //之后再从v2中取出最大值 
 66             return WritableComparator.compareBytes(b1, s1, l1-4, b2, s2, l2-4); 
 67         } 
 68  
 69     } 
 70  
 71     private static class MyMapper extends 
 72             Mapper<LongWritable, Text, TwoInt, IntWritable> { 
 73         //这里的v2需要改为IntWritable而不是nullwritable 
 74         TwoInt K2 = new TwoInt(); 
 75         IntWritable v2= new IntWritable(); 
 76         @Override 
 77         protected void map(LongWritable key, Text value, 
 78                 Mapper<LongWritable, Text, TwoInt, IntWritable>.Context context) 
 79                 throws IOException, InterruptedException { 
 80             String[] splited = value.toString().split("/t"); 
 81             K2.set(Integer.parseInt(splited[0]), Integer.parseInt(splited[1])); 
 82             v2.set(Integer.parseInt(splited[1])); 
 83             context.write(K2, v2); 
 84         } 
 85     } 
 86  
 87     private static class MyReducer extends 
 88             Reducer<TwoInt, IntWritable, IntWritable, IntWritable> {//k2,v2s,k3,v3 
 89         IntWritable k3 = new IntWritable(); 
 90         IntWritable v3 = new IntWritable(); 
 91         @Override 
 92         protected void reduce( 
 93                 TwoInt k2, 
 94                 Iterable<IntWritable> v2s, 
 95                 Reducer<TwoInt, IntWritable, IntWritable, IntWritable>.Context context) 
 96                 throws IOException, InterruptedException { 
 97             int max=Integer.MIN_VALUE; 
 98             for (IntWritable v2 : v2s) { 
 99                 if (v2.get()>max) { 
100                     max=v2.get(); 
101                 } 
102             } 
103             //每个组求得一个最大值可得到结果的序列 
104             v3.set(max); 
105             k3.set(k2.t1);//k2的第一列作为k3,因为k2为Twoint类型 
106             context.write(k3,v3); 
107         } 
108     } 
109  
110     private static class TwoInt implements WritableComparable<TwoInt> { 
111         public int t1; 
112         public int t2; 
113  
114         public void write(DataOutput out) throws IOException { 
115             out.writeInt(t1); 
116             out.writeInt(t2); 
117         } 
118  
119         public void set(int t1, int t2) { 
120             this.t1 = t1; 
121             this.t2 = t2; 
122         } 
123  
124         public void readFields(DataInput in) throws IOException { 
125             this.t1 = in.readInt(); 
126             this.t2 = in.readInt(); 
127         } 
128  
129         public int compareTo(TwoInt o) { 
130             if (this.t1 == o.t1) { // 當第一列相等的時候,第二列升序排列 
131                 return this.t2 - o.t2; 
132             } 
133             return this.t1 - o.t1;// 當第一列不相等的時候,按第一列升序排列 
134         } 
135         @Override 
136         public String toString() { 
137             return t1 + "/t" + t2; 
138         } 
139     } 
140 }

方法1求最值

测试并运行结果如下:

[[email protected] filecontent]# hadoop dfs -text  /out9/part-r-00000

MapReduce分组详解大数据

 [[email protected] filecontent]# hadoop dfs -text  /out10/part-r-00000

 MapReduce分组详解大数据

结果是正确无误的。

 

 END~

 

       

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9090.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论