MapRedece(单表关联)详解大数据

源数据:Child–Parent表

Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Marry
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philop Terry
Philop Alma
Mark Terry
Mark Alma

   

 

 

 

 

 

 

 

 

 

 

 

 

目标:表的自连接:从图中可以找出Tom的grandparent为Marry和Ben,同理可以找出其他的人的grandparent

MapRedece(单表关联)详解大数据

 根据Child–Parent表推断grandchild和grandparent

                                左表                                                                        右表

MapRedece(单表关联)详解大数据           MapRedece(单表关联)详解大数据   MapRedece(单表关联)详解大数据

将一张表分解为两张表的连接:从图中可以找出Tom的grandparent为Marry和Ben,同理可以找出其他的人的grandparent

MapRedece(单表关联)详解大数据

思路与步骤:

只有连接 左表的parent列和右表的child列,才能得到grandchild和grandparent的信息。

因此需要将源数据的一张表拆分成两张表,且左表和右表是同一个表,如上图。

  • 所以在map阶段将读入数据分割成child和parent之后,将parent设置成key,child设置成value进行输出,并作为左表;
  • 再将同一对child和parent中的child设置成key,parent设置成value进行输出,作为右表。
  • 为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的String最开始处加上字符1表示左表,加上字符2表示右表。
  • 这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。
  • reduce接收到连接的结果,其中每个key的value-list就包含了”grandchild–grandparent”关系。
  • 取出每个key的value-list进行解析,将左表中的child放入一个数组,右表中的parent放入一个数组,
  • 最后对两个数组求笛卡尔积得到最后的结果

代码1:

  (1)自定义Mapper类    

 1 private static class MyMapper extends Mapper<Object, Text, Text, Text> { 
 2         @Override 
 3         protected void map(Object k1, Text v1, 
 4                 Mapper<Object, Text, Text, Text>.Context context) 
 5                 throws IOException, InterruptedException { 
 6             String childName = new String(); 
 7             String parentName = new String(); 
 8             String relationType = new String(); 
 9             Text k2 = new Text(); 
10             Text v2 = new Text(); 
11             // 輸入一行预处理的文本 
12             StringTokenizer items = new StringTokenizer(v1.toString()); 
13             String[] values = new String[2]; 
14             int i = 0; 
15             while (items.hasMoreTokens()) { 
16                 values[i] = items.nextToken(); 
17                 i++; 
18             } 
19             if (values[0].compareTo("child") != 0) { 
20                 childName = values[0]; 
21                 parentName = values[1]; 
22                 // 输出左表,左表加1的标识 
23                 relationType = "1"; 
24                 k2 = new Text(values[1]); // parent作为key,作为表1的key 
25                 v2 = new Text(relationType + "+" + childName + "+" + parentName);//<1+Lucy+Tom> 
26                 context.write(k2, v2); 
27                 // 输出右表,右表加2的标识 
28                 relationType = "2"; 
29                 k2 = new Text(values[0]);// child作为key,作为表2的key 
30                 v2 = new Text(relationType + "+" + childName + "+" + parentName);//<2+Jone+Lucy> 
31                 context.write(k2, v2); 
32             } 
33         } 
34     }

(2)自定义Reduce

 1 private static class MyReducer extends Reducer<Text, Text, Text, Text> { 
 2         Text k3 = new Text(); 
 3         Text v3 = new Text(); 
 4  
 5         @Override 
 6         protected void reduce(Text k2, Iterable<Text> v2s, 
 7                 Reducer<Text, Text, Text, Text>.Context context) 
 8                 throws IOException, InterruptedException { 
 9             if (0 == time) { 
10                 context.write(new Text("grandchild"), new Text("grandparent")); 
11                 time++; 
12             } 
13             int grandchildnum = 0; 
14             String[] grandchild = new String[10];//孙子 
15             int grandparentnum = 0; 
16             String[] grandparent = new String[10];//爷爷 
17             Iterator items = v2s.iterator();//["1 Tom","2 Mary","2 Ben"] 
18             while (items.hasNext()) { 
19                 String record = items.next().toString(); 
20                 int len = record.length(); 
21                 int i = 2; 
22                 if (0 == len) { 
23                     continue; 
24                 } 
25  
26                 // 取得左右表的标识 
27                 char relationType = record.charAt(0); 
28                 // 定义孩子和父母变量 
29                 String childname = new String(); 
30                 String parentname = new String(); 
31                 // 获取value列表中value的child 
32                 while (record.charAt(i) != '+') { 
33                     childname += record.charAt(i); 
34                     i++; 
35                 } 
36                 i = i + 1; //越过名字之间的“+”加号 
37                 // 获取value列表中value的parent 
38                 while (i < len) { 
39                     parentname += record.charAt(i); 
40                     i++; 
41                 } 
42                 // 左表,取出child放入grandchildren 
43                 if ('1' == relationType) { 
44                     grandchild[grandchildnum] = childname; 
45                     grandchildnum++; 
46                 } 
47                 // 右表,取出parent放入grandparent 
48                 if ('2' == relationType) { 
49                     grandparent[grandparentnum] = parentname; 
50                     grandparentnum++; 
51                 } 
52             } 
53             // grandchild和grandparentnum数组求笛卡尔积 
54             if (0 != grandchildnum && 0 != grandparentnum) { 
55                 for (int i = 0; i < grandchildnum; i++) { 
56                     for (int j = 0; j < grandparentnum; j++) { 
57                         k3 = new Text(grandchild[i]); 
58                         v3 = new Text(grandparent[j]); 
59                         context.write(k3, v3); 
60                     } 
61                 } 
62             } 
63         } 
64     }

(3)Map和Reduce组合

 1     public static void main(String[] args) throws Exception { 
 2         //必须要传递的是自定的mapper和reducer的类,输入输出的路径必须指定,输出的类型<k3,v3>必须指定 
 3         //2将自定义的MyMapper和MyReducer组装在一起 
 4         Configuration conf=new Configuration(); 
 5         String jobName=SingleTableLink.class.getSimpleName(); 
 6         //1首先寫job,知道需要conf和jobname在去創建即可 
 7         Job job = Job.getInstance(conf, jobName); 
 8          
 9         //*13最后,如果要打包运行改程序,则需要调用如下行 
10         job.setJarByClass(SingleTableLink.class); 
11          
12         //3读取HDFS內容:FileInputFormat在mapreduce.lib包下 
13         FileInputFormat.setInputPaths(job, new Path(args[0])); 
14         //4指定解析<k1,v1>的类(谁来解析键值对) 
15         //*指定解析的类可以省略不写,因为设置解析类默认的就是TextInputFormat.class 
16         job.setInputFormatClass(TextInputFormat.class); 
17         //5指定自定义mapper类 
18         job.setMapperClass(MyMapper.class); 
19         //6指定map输出的key2的类型和value2的类型  <k2,v2> 
20         //*下面两步可以省略,当<k3,v3>和<k2,v2>类型一致的时候,<k2,v2>类型可以不指定 
21         job.setMapOutputKeyClass(Text.class); 
22         job.setMapOutputValueClass(Text.class); 
23         //7分区(默认1个),排序,分组,规约 采用 默认 
24          
25         //接下来采用reduce步骤 
26         //8指定自定义的reduce类 
27         job.setReducerClass(MyReducer.class); 
28         //9指定输出的<k3,v3>类型 
29         job.setOutputKeyClass(Text.class); 
30         job.setOutputValueClass(Text.class); 
31         //10指定输出<K3,V3>的类 
32         //*下面这一步可以省 
33         job.setOutputFormatClass(TextOutputFormat.class); 
34         //11指定输出路径 
35         FileOutputFormat.setOutputPath(job, new Path(args[1])); 
36          
37         //12写的mapreduce程序要交给resource manager运行 
38         job.waitForCompletion(true); 
39     }

 所有源代码:

MapRedece(单表关联)详解大数据

  1 package Mapreduce; 
  2  
  3 import java.io.IOException; 
  4 import java.util.Iterator; 
  5 import java.util.StringTokenizer; 
  6  
  7 import org.apache.hadoop.conf.Configuration; 
  8 import org.apache.hadoop.fs.Path; 
  9 import org.apache.hadoop.io.Text; 
 10 import org.apache.hadoop.mapreduce.Job; 
 11 import org.apache.hadoop.mapreduce.Mapper; 
 12 import org.apache.hadoop.mapreduce.Reducer; 
 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
 14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
 16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
 17  
 18 public class SingleTableLink { 
 19     private static int time = 0; 
 20  
 21     public static void main(String[] args) throws Exception { 
 22         //必须要传递的是自定的mapper和reducer的类,输入输出的路径必须指定,输出的类型<k3,v3>必须指定 
 23         //2将自定义的MyMapper和MyReducer组装在一起 
 24         Configuration conf=new Configuration(); 
 25         String jobName=SingleTableLink.class.getSimpleName(); 
 26         //1首先寫job,知道需要conf和jobname在去創建即可 
 27         Job job = Job.getInstance(conf, jobName); 
 28          
 29         //*13最后,如果要打包运行改程序,则需要调用如下行 
 30         job.setJarByClass(SingleTableLink.class); 
 31          
 32         //3读取HDFS內容:FileInputFormat在mapreduce.lib包下 
 33         FileInputFormat.setInputPaths(job, new Path(args[0])); 
 34         //4指定解析<k1,v1>的类(谁来解析键值对) 
 35         //*指定解析的类可以省略不写,因为设置解析类默认的就是TextInputFormat.class 
 36         job.setInputFormatClass(TextInputFormat.class); 
 37         //5指定自定义mapper类 
 38         job.setMapperClass(MyMapper.class); 
 39         //6指定map输出的key2的类型和value2的类型  <k2,v2> 
 40         //*下面两步可以省略,当<k3,v3>和<k2,v2>类型一致的时候,<k2,v2>类型可以不指定 
 41         job.setMapOutputKeyClass(Text.class); 
 42         job.setMapOutputValueClass(Text.class); 
 43         //7分区(默认1个),排序,分组,规约 采用 默认 
 44          
 45         //接下来采用reduce步骤 
 46         //8指定自定义的reduce类 
 47         job.setReducerClass(MyReducer.class); 
 48         //9指定输出的<k3,v3>类型 
 49         job.setOutputKeyClass(Text.class); 
 50         job.setOutputValueClass(Text.class); 
 51         //10指定输出<K3,V3>的类 
 52         //*下面这一步可以省 
 53         job.setOutputFormatClass(TextOutputFormat.class); 
 54         //11指定输出路径 
 55         FileOutputFormat.setOutputPath(job, new Path(args[1])); 
 56          
 57         //12写的mapreduce程序要交给resource manager运行 
 58         job.waitForCompletion(true); 
 59     } 
 60  
 61     private static class MyMapper extends Mapper<Object, Text, Text, Text> { 
 62         @Override 
 63         protected void map(Object k1, Text v1, 
 64                 Mapper<Object, Text, Text, Text>.Context context) 
 65                 throws IOException, InterruptedException { 
 66             String childName = new String(); 
 67             String parentName = new String(); 
 68             String relationType = new String(); 
 69             Text k2 = new Text(); 
 70             Text v2 = new Text(); 
 71             // 輸入一行预处理的文本 
 72             StringTokenizer items = new StringTokenizer(v1.toString()); 
 73             String[] values = new String[2]; 
 74             int i = 0; 
 75             while (items.hasMoreTokens()) { 
 76                 values[i] = items.nextToken(); 
 77                 i++; 
 78             } 
 79             if (values[0].compareTo("child") != 0) { 
 80                 childName = values[0]; 
 81                 parentName = values[1]; 
 82                 // 输出左表,左表加1的标识 
 83                 relationType = "1"; 
 84                 k2 = new Text(values[1]); // parent作为key,作为表1的key 
 85                 v2 = new Text(relationType + "+" + childName + "+" + parentName);//<1+Lucy+Tom> 
 86                 context.write(k2, v2); 
 87                 // 输出右表,右表加2的标识 
 88                 relationType = "2"; 
 89                 k2 = new Text(values[0]);// child作为key,作为表2的key 
 90                 v2 = new Text(relationType + "+" + childName + "+" + parentName);//<2+Jone+Lucy> 
 91                 context.write(k2, v2); 
 92             } 
 93         } 
 94     } 
 95  
 96     private static class MyReducer extends Reducer<Text, Text, Text, Text> { 
 97         Text k3 = new Text(); 
 98         Text v3 = new Text(); 
 99  
100         @Override 
101         protected void reduce(Text k2, Iterable<Text> v2s, 
102                 Reducer<Text, Text, Text, Text>.Context context) 
103                 throws IOException, InterruptedException { 
104             if (0 == time) { 
105                 context.write(new Text("grandchild"), new Text("grandparent")); 
106                 time++; 
107             } 
108             int grandchildnum = 0; 
109             String[] grandchild = new String[10];//孙子 
110             int grandparentnum = 0; 
111             String[] grandparent = new String[10];//爷爷 
112             Iterator items = v2s.iterator();//["1 Tom","2 Mary","2 Ben"] 
113             while (items.hasNext()) { 
114                 String record = items.next().toString(); 
115                 int len = record.length(); 
116                 int i = 2; 
117                 if (0 == len) { 
118                     continue; 
119                 } 
120  
121                 // 取得左右表的标识 
122                 char relationType = record.charAt(0); 
123                 // 定义孩子和父母变量 
124                 String childname = new String(); 
125                 String parentname = new String(); 
126                 // 获取value列表中value的child 
127                 while (record.charAt(i) != '+') { 
128                     childname += record.charAt(i); 
129                     i++; 
130                 } 
131                 i = i + 1; //越过名字之间的“+”加号 
132                 // 获取value列表中value的parent 
133                 while (i < len) { 
134                     parentname += record.charAt(i); 
135                     i++; 
136                 } 
137                 // 左表,取出child放入grandchildren 
138                 if ('1' == relationType) { 
139                     grandchild[grandchildnum] = childname; 
140                     grandchildnum++; 
141                 } 
142                 // 右表,取出parent放入grandparent 
143                 if ('2' == relationType) { 
144                     grandparent[grandparentnum] = parentname; 
145                     grandparentnum++; 
146                 } 
147             } 
148             // grandchild和grandparentnum数组求笛卡尔积 
149             if (0 != grandchildnum && 0 != grandparentnum) { 
150                 for (int i = 0; i < grandchildnum; i++) { 
151                     for (int j = 0; j < grandparentnum; j++) { 
152                         k3 = new Text(grandchild[i]); 
153                         v3 = new Text(grandparent[j]); 
154                         context.write(k3, v3); 
155                     } 
156                 } 
157             } 
158         } 
159     } 
160  
161 }

代码1单表关联

代码2:参考的代码

MapRedece(单表关联)详解大数据

  1 package Mapreduce; 
  2  
  3 import java.io.IOException; 
  4 import java.util.ArrayList; 
  5 import java.util.Iterator; 
  6 import java.util.List; 
  7 import org.apache.hadoop.conf.Configuration; 
  8 import org.apache.hadoop.fs.Path; 
  9 import org.apache.hadoop.io.Text; 
 10 import org.apache.hadoop.mapreduce.Job; 
 11 import org.apache.hadoop.mapreduce.Mapper; 
 12 import org.apache.hadoop.mapreduce.Reducer; 
 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
 14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
 16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
 17  
 18 public class SingleTableLink2 { 
 19  
 20     public static void main(String[] args) throws Exception { 
 21         // 必须要传递的是自定的mapper和reducer的类,输入输出的路径必须指定,输出的类型<k3,v3>必须指定 
 22         // 2将自定义的MyMapper和MyReducer组装在一起 
 23         Configuration conf = new Configuration(); 
 24         String jobName = SingleTableLink2.class.getSimpleName(); 
 25         // 1首先寫job,知道需要conf和jobname在去創建即可 
 26         Job job = Job.getInstance(conf, jobName); 
 27  
 28         // *13最后,如果要打包运行改程序,则需要调用如下行 
 29         job.setJarByClass(SingleTableLink2.class); 
 30  
 31         // 3读取HDFS內容:FileInputFormat在mapreduce.lib包下 
 32         FileInputFormat.setInputPaths(job, new Path(args[0])); 
 33         // 4指定解析<k1,v1>的类(谁来解析键值对) 
 34         // *指定解析的类可以省略不写,因为设置解析类默认的就是TextInputFormat.class 
 35         job.setInputFormatClass(TextInputFormat.class); 
 36         // 5指定自定义mapper类 
 37         job.setMapperClass(MyMapper.class); 
 38         // 6指定map输出的key2的类型和value2的类型 <k2,v2> 
 39         // *下面两步可以省略,当<k3,v3>和<k2,v2>类型一致的时候,<k2,v2>类型可以不指定 
 40         job.setMapOutputKeyClass(Text.class); 
 41         job.setMapOutputValueClass(Text.class); 
 42         // 7分区(默认1个),排序,分组,规约 采用 默认 
 43  
 44         // 接下来采用reduce步骤 
 45         // 8指定自定义的reduce类 
 46         job.setReducerClass(MyReducer.class); 
 47         // 9指定输出的<k3,v3>类型 
 48         job.setOutputKeyClass(Text.class); 
 49         job.setOutputValueClass(Text.class); 
 50         // 10指定输出<K3,V3>的类 
 51         // *下面这一步可以省 
 52         job.setOutputFormatClass(TextOutputFormat.class); 
 53         // 11指定输出路径 
 54         FileOutputFormat.setOutputPath(job, new Path(args[1])); 
 55  
 56         // 12写的mapreduce程序要交给resource manager运行 
 57         job.waitForCompletion(true); 
 58     } 
 59  
 60     private static class MyMapper extends Mapper<Object, Text, Text, Text> { 
 61         @Override 
 62         protected void map(Object k1, Text v1, 
 63                 Mapper<Object, Text, Text, Text>.Context context) 
 64                 throws IOException, InterruptedException { 
 65             String childName = new String(); 
 66             String parentName = new String(); 
 67             String relationType = new String(); 
 68             Text k2 = new Text(); 
 69             Text v2 = new Text(); 
 70             // 輸入一行预处理的文本 
 71             String line = v1.toString(); 
 72             String[] values = line.split("/t"); 
 73             if (values.length >= 2) { 
 74                 if (values[0].compareTo("child") != 0) { 
 75                     childName = values[0]; 
 76                     parentName = values[1]; 
 77                     // 输出左表,左表加1的标识 
 78                     relationType = "1"; 
 79                     k2 = new Text(parentName); // parent作为key,作为表1的key 
 80                     v2 = new Text(relationType + " " + childName);// <"Lucy","1 Tom"> 
 81                     context.write(k2, v2); 
 82                     // 输出右表,右表加2的标识 
 83                     relationType = "2"; 
 84                     k2 = new Text(childName);// child作为key,作为表2的key 
 85                     v2 = new Text(relationType + " " + parentName);// //<"Jone","2 Lucy"> 
 86                     context.write(k2, v2); 
 87                 } 
 88             } 
 89         } 
 90     } 
 91  
 92     private static class MyReducer extends Reducer<Text, Text, Text, Text> { 
 93  
 94         @Override 
 95         protected void reduce(Text key, Iterable<Text> values, Context context) 
 96                 throws IOException, InterruptedException { 
 97             List<String> grandChild = new ArrayList<String>();// 孙子 
 98             List<String> grandParent = new ArrayList<String>();// 爷爷 
 99             Iterator<Text> it = values.iterator();// ["1 Tom","2 Mary","2 Ben"] 
100             while (it.hasNext()) { 
101                 String[] record = it.next().toString().split(" ");// "1 Tom"---[1,Tom] 
102                 if (record.length == 0) 
103                     continue; 
104                 if (record[0].equals("1")) {// 左表,取出child放入grandchild 
105                     grandChild.add(record[1]); 
106                 } else if (record[0].equals("2")) {// 右表,取出parent放入grandParent 
107                     grandParent.add(record[1]); 
108                 } 
109             } 
110             // grandchild 和 grandparent数组求笛卡尔积 
111             if (grandChild.size() != 0 && grandParent.size() != 0) { 
112                 for (int i = 0; i < grandChild.size(); i++) { 
113                     for (int j = 0; j < grandParent.size(); j++) { 
114                         context.write(new Text(grandChild.get(i)), new Text( 
115                                 grandParent.get(j))); 
116                     } 
117                 } 
118             } 
119         } 
120     } 
121  
122 }

代码2单表关联代码

 

代码运行:

(1)准备数据

[[email protected] filecontent]# vi child_parent
Tom  Lucy
Tom  Jack
Jone   Lucy
Jone    Jack
Lucy    Mary
Lucy   Ben
Jack    Alice
Jack    Jesses
Terry    Alice
Terry    Jesses
Philip    Terry
Philip    Alma
Mark    Terry
Mark    Alma

(以/t分隔)

 MapRedece(单表关联)详解大数据

(2)执行jar包

[[email protected] filecontent]# hadoop jar SingleTableLink2.jar /neusoft/child_parent  /out13  

 MapRedece(单表关联)详解大数据

(3)查看运行结果是否正确

[[email protected] filecontent]# hadoop dfs -text /out13/part-r-00000

 MapRedece(单表关联)详解大数据

备注:(1)如果显示的多一个+,加号,需要检查程序,在下面两个循环之间加移位操作。

// 获取value列表中value的child 32 while (record.charAt(i) != ‘+’) { 33 childname += record.charAt(i); 34 i++; 35 } 36 i = i + 1; //越过名字之间的“+”加号 37 // 获取value列表中value的parent 38 while (i < len) { 39 parentname += record.charAt(i); 40 i++; 41 }

    MapRedece(单表关联)详解大数据

(2)补充点:

charAt

charAt(int index)方法是一个能够用来检索特定索引下的字符的String实例的方法.

charAt()方法返回指定索引位置的char值。索引范围为0~length()-1.

如: str.charAt(0)检索str中的第一个字符,str.charAt(str.length()-1)检索最后一个字符.

StringTokenizer是一个用来分隔String的应用类,相当于VB的split函数。

1.构造函数

public StringTokenizer(String str)

public StringTokenizer(String str, String delim)

public StringTokenizer(String str, String delim, boolean returnDelims)

第一个参数就是要分隔的String,第二个是分隔字符集合,第三个参数表示分隔符号是否作为标记返回,如果不指定分隔字符,默认的是:”/t/n/r/f”

2.核心方法

public boolean hasMoreTokens()

public String nextToken()

public String nextToken(String delim)

public int countTokens()

其实就是三个方法,返回分隔字符块的时候也可以指定分割符,而且以后都是采用最后一次指定的分隔符号。

 

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9085.html

(0)
上一篇 2021年7月19日 09:11
下一篇 2021年7月19日 09:11

相关推荐

发表回复

登录后才能评论