MapReduce部分源码解读(一)详解大数据

 1 /** 
 2  * Licensed to the Apache Software Foundation (ASF) under one 
 3  * or more contributor license agreements.  See the NOTICE file 
 4  * distributed with this work for additional information 
 5  * regarding copyright ownership.  The ASF licenses this file 
 6  * to you under the Apache License, Version 2.0 (the 
 7  * "License"); you may not use this file except in compliance 
 8  * with the License.  You may obtain a copy of the License at 
 9  * 
10  *     http://www.apache.org/licenses/LICENSE-2.0 
11  * 
12  * Unless required by applicable law or agreed to in writing, software 
13  * distributed under the License is distributed on an "AS IS" BASIS, 
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
15  * See the License for the specific language governing permissions and 
16  * limitations under the License. 
17  */ 
18  
19 package org.apache.hadoop.mapreduce.lib.input; 
20  
21 import org.apache.hadoop.classification.InterfaceAudience; 
22 import org.apache.hadoop.classification.InterfaceStability; 
23 import org.apache.hadoop.fs.Path; 
24 import org.apache.hadoop.io.LongWritable; 
25 import org.apache.hadoop.io.Text; 
26 import org.apache.hadoop.io.compress.CompressionCodec; 
27 import org.apache.hadoop.io.compress.CompressionCodecFactory; 
28 import org.apache.hadoop.io.compress.SplittableCompressionCodec; 
29 import org.apache.hadoop.mapreduce.InputFormat; 
30 import org.apache.hadoop.mapreduce.InputSplit; 
31 import org.apache.hadoop.mapreduce.JobContext; 
32 import org.apache.hadoop.mapreduce.RecordReader; 
33 import org.apache.hadoop.mapreduce.TaskAttemptContext; 
34  
35 import com.google.common.base.Charsets; 
36  
37 /** An {@link InputFormat} for plain text files.  Files are broken into lines. 
38  * Either linefeed or carriage-return are used to signal end of line.  Keys are 
39  * the position in the file, and values are the line of text.. */ 
40 @InterfaceAudience.Public 
41 @InterfaceStability.Stable 
42 public class TextInputFormat extends FileInputFormat<LongWritable, Text> { 
43  
44   @Override 
45   public RecordReader<LongWritable, Text>  
46     createRecordReader(InputSplit split, 
47                        TaskAttemptContext context) { 
48     String delimiter = context.getConfiguration().get( 
49         "textinputformat.record.delimiter"); 
50     byte[] recordDelimiterBytes = null; 
51     if (null != delimiter) 
52       recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); 
53     return new LineRecordReader(recordDelimiterBytes); 
54   } 
55  
56   @Override 
57   protected boolean isSplitable(JobContext context, Path file) { 
58     final CompressionCodec codec = 
59       new CompressionCodecFactory(context.getConfiguration()).getCodec(file); 
60     if (null == codec) { 
61       return true; 
62     } 
63     return codec instanceof SplittableCompressionCodec; 
64   } 
65  
66 }

TextInputFormat

父类(TextInputFormat本身含义为把每一行解析成键值对)

  1 /** 
  2  * Licensed to the Apache Software Foundation (ASF) under one 
  3  * or more contributor license agreements.  See the NOTICE file 
  4  * distributed with this work for additional information 
  5  * regarding copyright ownership.  The ASF licenses this file 
  6  * to you under the Apache License, Version 2.0 (the 
  7  * "License"); you may not use this file except in compliance 
  8  * with the License.  You may obtain a copy of the License at 
  9  * 
 10  *     http://www.apache.org/licenses/LICENSE-2.0 
 11  * 
 12  * Unless required by applicable law or agreed to in writing, software 
 13  * distributed under the License is distributed on an "AS IS" BASIS, 
 14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 15  * See the License for the specific language governing permissions and 
 16  * limitations under the License. 
 17  */ 
 18  
 19 package org.apache.hadoop.mapreduce.lib.input; 
 20  
 21 import java.io.IOException; 
 22 import java.util.ArrayList; 
 23 import java.util.List; 
 24  
 25 import org.apache.commons.logging.Log; 
 26 import org.apache.commons.logging.LogFactory; 
 27 import org.apache.hadoop.classification.InterfaceAudience; 
 28 import org.apache.hadoop.classification.InterfaceStability; 
 29 import org.apache.hadoop.conf.Configuration; 
 30 import org.apache.hadoop.fs.FileStatus; 
 31 import org.apache.hadoop.fs.FileSystem; 
 32 import org.apache.hadoop.fs.LocatedFileStatus; 
 33 import org.apache.hadoop.fs.Path; 
 34 import org.apache.hadoop.fs.PathFilter; 
 35 import org.apache.hadoop.fs.BlockLocation; 
 36 import org.apache.hadoop.fs.RemoteIterator; 
 37 import org.apache.hadoop.mapred.LocatedFileStatusFetcher; 
 38 import org.apache.hadoop.mapred.SplitLocationInfo; 
 39 import org.apache.hadoop.mapreduce.InputFormat; 
 40 import org.apache.hadoop.mapreduce.InputSplit; 
 41 import org.apache.hadoop.mapreduce.Job; 
 42 import org.apache.hadoop.mapreduce.JobContext; 
 43 import org.apache.hadoop.mapreduce.Mapper; 
 44 import org.apache.hadoop.mapreduce.security.TokenCache; 
 45 import org.apache.hadoop.util.ReflectionUtils; 
 46 import org.apache.hadoop.util.StringUtils; 
 47  
 48 import com.google.common.base.Stopwatch; 
 49 import com.google.common.collect.Lists; 
 50  
 51 /**  
 52  * A base class for file-based {@link InputFormat}s. 
 53  *  
 54  * <p><code>FileInputFormat</code> is the base class for all file-based  
 55  * <code>InputFormat</code>s. This provides a generic implementation of 
 56  * {@link #getSplits(JobContext)}. 
 57  * Subclasses of <code>FileInputFormat</code> can also override the  
 58  * {@link #isSplitable(JobContext, Path)} method to ensure input-files are 
 59  * not split-up and are processed as a whole by {@link Mapper}s. 
 60  */ 
 61 @InterfaceAudience.Public 
 62 @InterfaceStability.Stable 
 63 public abstract class FileInputFormat<K, V> extends InputFormat<K, V> { 
 64   public static final String INPUT_DIR =  
 65     "mapreduce.input.fileinputformat.inputdir"; 
 66   public static final String SPLIT_MAXSIZE =  
 67     "mapreduce.input.fileinputformat.split.maxsize"; 
 68   public static final String SPLIT_MINSIZE =  
 69     "mapreduce.input.fileinputformat.split.minsize"; 
 70   public static final String PATHFILTER_CLASS =  
 71     "mapreduce.input.pathFilter.class"; 
 72   public static final String NUM_INPUT_FILES = 
 73     "mapreduce.input.fileinputformat.numinputfiles"; 
 74   public static final String INPUT_DIR_RECURSIVE = 
 75     "mapreduce.input.fileinputformat.input.dir.recursive"; 
 76   public static final String LIST_STATUS_NUM_THREADS = 
 77       "mapreduce.input.fileinputformat.list-status.num-threads"; 
 78   public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1; 
 79  
 80   private static final Log LOG = LogFactory.getLog(FileInputFormat.class); 
 81  
 82   private static final double SPLIT_SLOP = 1.1;   // 10% slop 
 83    
 84   @Deprecated 
 85   public static enum Counter {  
 86     BYTES_READ 
 87   } 
 88  
 89   private static final PathFilter hiddenFileFilter = new PathFilter(){ 
 90       public boolean accept(Path p){ 
 91         String name = p.getName();  
 92         return !name.startsWith("_") && !name.startsWith(".");  
 93       } 
 94     };  
 95  
 96   /** 
 97    * Proxy PathFilter that accepts a path only if all filters given in the 
 98    * constructor do. Used by the listPaths() to apply the built-in 
 99    * hiddenFileFilter together with a user provided one (if any). 
100    */ 
101   private static class MultiPathFilter implements PathFilter { 
102     private List<PathFilter> filters; 
103  
104     public MultiPathFilter(List<PathFilter> filters) { 
105       this.filters = filters; 
106     } 
107  
108     public boolean accept(Path path) { 
109       for (PathFilter filter : filters) { 
110         if (!filter.accept(path)) { 
111           return false; 
112         } 
113       } 
114       return true; 
115     } 
116   } 
117    
118   /** 
119    * @param job 
120    *          the job to modify 
121    * @param inputDirRecursive 
122    */ 
123   public static void setInputDirRecursive(Job job, 
124       boolean inputDirRecursive) { 
125     job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE, 
126         inputDirRecursive); 
127   } 
128   
129   /** 
130    * @param job 
131    *          the job to look at. 
132    * @return should the files to be read recursively? 
133    */ 
134   public static boolean getInputDirRecursive(JobContext job) { 
135     return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE, 
136         false); 
137   } 
138  
139   /** 
140    * Get the lower bound on split size imposed by the format. 
141    * @return the number of bytes of the minimal split for this format 
142    */ 
143   protected long getFormatMinSplitSize() { 
144     return 1; 
145   } 
146  
147   /** 
148    * Is the given filename splitable? Usually, true, but if the file is 
149    * stream compressed, it will not be. 
150    *  
151    * <code>FileInputFormat</code> implementations can override this and return 
152    * <code>false</code> to ensure that individual input files are never split-up 
153    * so that {@link Mapper}s process entire files. 
154    *  
155    * @param context the job context 
156    * @param filename the file name to check 
157    * @return is this file splitable? 
158    */ 
159   protected boolean isSplitable(JobContext context, Path filename) { 
160     return true; 
161   } 
162  
163   /** 
164    * Set a PathFilter to be applied to the input paths for the map-reduce job. 
165    * @param job the job to modify 
166    * @param filter the PathFilter class use for filtering the input paths. 
167    */ 
168   public static void setInputPathFilter(Job job, 
169                                         Class<? extends PathFilter> filter) { 
170     job.getConfiguration().setClass(PATHFILTER_CLASS, filter,  
171                                     PathFilter.class); 
172   } 
173  
174   /** 
175    * Set the minimum input split size 
176    * @param job the job to modify 
177    * @param size the minimum size 
178    */ 
179   public static void setMinInputSplitSize(Job job, 
180                                           long size) { 
181     job.getConfiguration().setLong(SPLIT_MINSIZE, size); 
182   } 
183  
184   /** 
185    * Get the minimum split size 
186    * @param job the job 
187    * @return the minimum number of bytes that can be in a split 
188    */ 
189   public static long getMinSplitSize(JobContext job) { 
190     return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); 
191   } 
192  
193   /** 
194    * Set the maximum split size 
195    * @param job the job to modify 
196    * @param size the maximum split size 
197    */ 
198   public static void setMaxInputSplitSize(Job job, 
199                                           long size) { 
200     job.getConfiguration().setLong(SPLIT_MAXSIZE, size); 
201   } 
202  
203   /** 
204    * Get the maximum split size. 
205    * @param context the job to look at. 
206    * @return the maximum number of bytes a split can include 
207    */ 
208   public static long getMaxSplitSize(JobContext context) { 
209     return context.getConfiguration().getLong(SPLIT_MAXSIZE,  
210                                               Long.MAX_VALUE); 
211   } 
212  
213   /** 
214    * Get a PathFilter instance of the filter set for the input paths. 
215    * 
216    * @return the PathFilter instance set for the job, NULL if none has been set. 
217    */ 
218   public static PathFilter getInputPathFilter(JobContext context) { 
219     Configuration conf = context.getConfiguration(); 
220     Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null, 
221         PathFilter.class); 
222     return (filterClass != null) ? 
223         (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null; 
224   } 
225  
226   /** List input directories. 
227    * Subclasses may override to, e.g., select only files matching a regular 
228    * expression.  
229    *  
230    * @param job the job to list input paths for 
231    * @return array of FileStatus objects 
232    * @throws IOException if zero items. 
233    */ 
234   protected List<FileStatus> listStatus(JobContext job 
235                                         ) throws IOException { 
236     Path[] dirs = getInputPaths(job); 
237     if (dirs.length == 0) { 
238       throw new IOException("No input paths specified in job"); 
239     } 
240      
241     // get tokens for all the required FileSystems.. 
242     TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,  
243                                         job.getConfiguration()); 
244  
245     // Whether we need to recursive look into the directory structure 
246     boolean recursive = getInputDirRecursive(job); 
247  
248     // creates a MultiPathFilter with the hiddenFileFilter and the 
249     // user provided one (if any). 
250     List<PathFilter> filters = new ArrayList<PathFilter>(); 
251     filters.add(hiddenFileFilter); 
252     PathFilter jobFilter = getInputPathFilter(job); 
253     if (jobFilter != null) { 
254       filters.add(jobFilter); 
255     } 
256     PathFilter inputFilter = new MultiPathFilter(filters); 
257      
258     List<FileStatus> result = null; 
259  
260     int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS, 
261         DEFAULT_LIST_STATUS_NUM_THREADS); 
262     Stopwatch sw = new Stopwatch().start(); 
263     if (numThreads == 1) { 
264       result = singleThreadedListStatus(job, dirs, inputFilter, recursive); 
265     } else { 
266       Iterable<FileStatus> locatedFiles = null; 
267       try { 
268         LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( 
269             job.getConfiguration(), dirs, recursive, inputFilter, true); 
270         locatedFiles = locatedFileStatusFetcher.getFileStatuses(); 
271       } catch (InterruptedException e) { 
272         throw new IOException("Interrupted while getting file statuses"); 
273       } 
274       result = Lists.newArrayList(locatedFiles); 
275     } 
276      
277     sw.stop(); 
278     if (LOG.isDebugEnabled()) { 
279       LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis()); 
280     } 
281     LOG.info("Total input paths to process : " + result.size());  
282     return result; 
283   } 
284  
285   private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs, 
286       PathFilter inputFilter, boolean recursive) throws IOException { 
287     List<FileStatus> result = new ArrayList<FileStatus>(); 
288     List<IOException> errors = new ArrayList<IOException>(); 
289     for (int i=0; i < dirs.length; ++i) { 
290       Path p = dirs[i]; 
291       FileSystem fs = p.getFileSystem(job.getConfiguration());  
292       FileStatus[] matches = fs.globStatus(p, inputFilter); 
293       if (matches == null) { 
294         errors.add(new IOException("Input path does not exist: " + p)); 
295       } else if (matches.length == 0) { 
296         errors.add(new IOException("Input Pattern " + p + " matches 0 files")); 
297       } else { 
298         for (FileStatus globStat: matches) { 
299           if (globStat.isDirectory()) { 
300             RemoteIterator<LocatedFileStatus> iter = 
301                 fs.listLocatedStatus(globStat.getPath()); 
302             while (iter.hasNext()) { 
303               LocatedFileStatus stat = iter.next(); 
304               if (inputFilter.accept(stat.getPath())) { 
305                 if (recursive && stat.isDirectory()) { 
306                   addInputPathRecursively(result, fs, stat.getPath(), 
307                       inputFilter); 
308                 } else { 
309                   result.add(stat); 
310                 } 
311               } 
312             } 
313           } else { 
314             result.add(globStat); 
315           } 
316         } 
317       } 
318     } 
319  
320     if (!errors.isEmpty()) { 
321       throw new InvalidInputException(errors); 
322     } 
323     return result; 
324   } 
325    
326   /** 
327    * Add files in the input path recursively into the results. 
328    * @param result 
329    *          The List to store all files. 
330    * @param fs 
331    *          The FileSystem. 
332    * @param path 
333    *          The input path. 
334    * @param inputFilter 
335    *          The input filter that can be used to filter files/dirs.  
336    * @throws IOException 
337    */ 
338   protected void addInputPathRecursively(List<FileStatus> result, 
339       FileSystem fs, Path path, PathFilter inputFilter)  
340       throws IOException { 
341     RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); 
342     while (iter.hasNext()) { 
343       LocatedFileStatus stat = iter.next(); 
344       if (inputFilter.accept(stat.getPath())) { 
345         if (stat.isDirectory()) { 
346           addInputPathRecursively(result, fs, stat.getPath(), inputFilter); 
347         } else { 
348           result.add(stat); 
349         } 
350       } 
351     } 
352   } 
353    
354    
355   /** 
356    * A factory that makes the split for this class. It can be overridden 
357    * by sub-classes to make sub-types 
358    */ 
359   protected FileSplit makeSplit(Path file, long start, long length,  
360                                 String[] hosts) { 
361     return new FileSplit(file, start, length, hosts); 
362   } 
363    
364   /** 
365    * A factory that makes the split for this class. It can be overridden 
366    * by sub-classes to make sub-types 
367    */ 
368   protected FileSplit makeSplit(Path file, long start, long length,  
369                                 String[] hosts, String[] inMemoryHosts) { 
370     return new FileSplit(file, start, length, hosts, inMemoryHosts); 
371   } 
372  
373   /**  
374    * Generate the list of files and make them into FileSplits. 
375    * @param job the job context 
376    * @throws IOException 
377    */ 
378   public List<InputSplit> getSplits(JobContext job) throws IOException { 
379     Stopwatch sw = new Stopwatch().start(); 
380     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); 
381     long maxSize = getMaxSplitSize(job); 
382  
383     // generate splits 
384     List<InputSplit> splits = new ArrayList<InputSplit>(); 
385     List<FileStatus> files = listStatus(job); 
386     for (FileStatus file: files) { 
387       Path path = file.getPath(); 
388       long length = file.getLen(); 
389       if (length != 0) { 
390         BlockLocation[] blkLocations; 
391         if (file instanceof LocatedFileStatus) { 
392           blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 
393         } else { 
394           FileSystem fs = path.getFileSystem(job.getConfiguration()); 
395           blkLocations = fs.getFileBlockLocations(file, 0, length); 
396         } 
397         if (isSplitable(job, path)) { 
398           long blockSize = file.getBlockSize(); 
399           long splitSize = computeSplitSize(blockSize, minSize, maxSize); 
400  
401           long bytesRemaining = length; 
402           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 
403             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 
404             splits.add(makeSplit(path, length-bytesRemaining, splitSize, 
405                         blkLocations[blkIndex].getHosts(), 
406                         blkLocations[blkIndex].getCachedHosts())); 
407             bytesRemaining -= splitSize; 
408           } 
409  
410           if (bytesRemaining != 0) { 
411             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 
412             splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 
413                        blkLocations[blkIndex].getHosts(), 
414                        blkLocations[blkIndex].getCachedHosts())); 
415           } 
416         } else { // not splitable 
417           splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), 
418                       blkLocations[0].getCachedHosts())); 
419         } 
420       } else {  
421         //Create empty hosts array for zero length files 
422         splits.add(makeSplit(path, 0, length, new String[0])); 
423       } 
424     } 
425     // Save the number of input files for metrics/loadgen 
426     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); 
427     sw.stop(); 
428     if (LOG.isDebugEnabled()) { 
429       LOG.debug("Total # of splits generated by getSplits: " + splits.size() 
430           + ", TimeTaken: " + sw.elapsedMillis()); 
431     } 
432     return splits; 
433   } 
434  
435   protected long computeSplitSize(long blockSize, long minSize, 
436                                   long maxSize) { 
437     return Math.max(minSize, Math.min(maxSize, blockSize)); 
438   } 
439  
440   protected int getBlockIndex(BlockLocation[] blkLocations,  
441                               long offset) { 
442     for (int i = 0 ; i < blkLocations.length; i++) { 
443       // is the offset inside this block? 
444       if ((blkLocations[i].getOffset() <= offset) && 
445           (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 
446         return i; 
447       } 
448     } 
449     BlockLocation last = blkLocations[blkLocations.length -1]; 
450     long fileLength = last.getOffset() + last.getLength() -1; 
451     throw new IllegalArgumentException("Offset " + offset +  
452                                        " is outside of file (0.." + 
453                                        fileLength + ")"); 
454   } 
455  
456   /** 
457    * Sets the given comma separated paths as the list of inputs  
458    * for the map-reduce job. 
459    *  
460    * @param job the job 
461    * @param commaSeparatedPaths Comma separated paths to be set as  
462    *        the list of inputs for the map-reduce job. 
463    */ 
464   public static void setInputPaths(Job job,  
465                                    String commaSeparatedPaths 
466                                    ) throws IOException { 
467     setInputPaths(job, StringUtils.stringToPath( 
468                         getPathStrings(commaSeparatedPaths))); 
469   } 
470  
471   /** 
472    * Add the given comma separated paths to the list of inputs for 
473    *  the map-reduce job. 
474    *  
475    * @param job The job to modify 
476    * @param commaSeparatedPaths Comma separated paths to be added to 
477    *        the list of inputs for the map-reduce job. 
478    */ 
479   public static void addInputPaths(Job job,  
480                                    String commaSeparatedPaths 
481                                    ) throws IOException { 
482     for (String str : getPathStrings(commaSeparatedPaths)) { 
483       addInputPath(job, new Path(str)); 
484     } 
485   } 
486  
487   /** 
488    * Set the array of {@link Path}s as the list of inputs 
489    * for the map-reduce job. 
490    *  
491    * @param job The job to modify  
492    * @param inputPaths the {@link Path}s of the input directories/files  
493    * for the map-reduce job. 
494    */  
495   public static void setInputPaths(Job job,  
496                                    Path... inputPaths) throws IOException { 
497     Configuration conf = job.getConfiguration(); 
498     Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]); 
499     StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString())); 
500     for(int i = 1; i < inputPaths.length;i++) { 
501       str.append(StringUtils.COMMA_STR); 
502       path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]); 
503       str.append(StringUtils.escapeString(path.toString())); 
504     } 
505     conf.set(INPUT_DIR, str.toString()); 
506   } 
507  
508   /** 
509    * Add a {@link Path} to the list of inputs for the map-reduce job. 
510    *  
511    * @param job The {@link Job} to modify 
512    * @param path {@link Path} to be added to the list of inputs for  
513    *            the map-reduce job. 
514    */ 
515   public static void addInputPath(Job job,  
516                                   Path path) throws IOException { 
517     Configuration conf = job.getConfiguration(); 
518     path = path.getFileSystem(conf).makeQualified(path); 
519     String dirStr = StringUtils.escapeString(path.toString()); 
520     String dirs = conf.get(INPUT_DIR); 
521     conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr); 
522   } 
523    
524   // This method escapes commas in the glob pattern of the given paths. 
525   private static String[] getPathStrings(String commaSeparatedPaths) { 
526     int length = commaSeparatedPaths.length(); 
527     int curlyOpen = 0; 
528     int pathStart = 0; 
529     boolean globPattern = false; 
530     List<String> pathStrings = new ArrayList<String>(); 
531      
532     for (int i=0; i<length; i++) { 
533       char ch = commaSeparatedPaths.charAt(i); 
534       switch(ch) { 
535         case '{' : { 
536           curlyOpen++; 
537           if (!globPattern) { 
538             globPattern = true; 
539           } 
540           break; 
541         } 
542         case '}' : { 
543           curlyOpen--; 
544           if (curlyOpen == 0 && globPattern) { 
545             globPattern = false; 
546           } 
547           break; 
548         } 
549         case ',' : { 
550           if (!globPattern) { 
551             pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); 
552             pathStart = i + 1 ; 
553           } 
554           break; 
555         } 
556         default: 
557           continue; // nothing special to do for this character 
558       } 
559     } 
560     pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); 
561      
562     return pathStrings.toArray(new String[0]); 
563   } 
564    
565   /** 
566    * Get the list of input {@link Path}s for the map-reduce job. 
567    *  
568    * @param context The job 
569    * @return the list of input {@link Path}s for the map-reduce job. 
570    */ 
571   public static Path[] getInputPaths(JobContext context) { 
572     String dirs = context.getConfiguration().get(INPUT_DIR, ""); 
573     String [] list = StringUtils.split(dirs); 
574     Path[] result = new Path[list.length]; 
575     for (int i = 0; i < list.length; i++) { 
576       result[i] = new Path(StringUtils.unEscapeString(list[i])); 
577     } 
578     return result; 
579   } 
580  
581 }

FileInputFormat

父类

  1 /** 
  2  * Licensed to the Apache Software Foundation (ASF) under one 
  3  * or more contributor license agreements.  See the NOTICE file 
  4  * distributed with this work for additional information 
  5  * regarding copyright ownership.  The ASF licenses this file 
  6  * to you under the Apache License, Version 2.0 (the 
  7  * "License"); you may not use this file except in compliance 
  8  * with the License.  You may obtain a copy of the License at 
  9  * 
 10  *     http://www.apache.org/licenses/LICENSE-2.0 
 11  * 
 12  * Unless required by applicable law or agreed to in writing, software 
 13  * distributed under the License is distributed on an "AS IS" BASIS, 
 14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 15  * See the License for the specific language governing permissions and 
 16  * limitations under the License. 
 17  */ 
 18  
 19 package org.apache.hadoop.mapreduce; 
 20  
 21 import java.io.IOException; 
 22 import java.util.List; 
 23  
 24 import org.apache.hadoop.classification.InterfaceAudience; 
 25 import org.apache.hadoop.classification.InterfaceStability; 
 26 import org.apache.hadoop.fs.FileSystem; 
 27 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
 28  
 29 /**  
 30  * <code>InputFormat</code> describes the input-specification for a  
 31  * Map-Reduce job.  
 32  *  
 33  * <p>The Map-Reduce framework relies on the <code>InputFormat</code> of the 
 34  * job to:<p> 
 35  * <ol> 
 36  *   <li> 
 37  *   Validate the input-specification of the job.  
 38  *   <li> 
 39  *   Split-up the input file(s) into logical {@link InputSplit}s, each of  
 40  *   which is then assigned to an individual {@link Mapper}. 
 41  *   </li> 
 42  *   <li> 
 43  *   Provide the {@link RecordReader} implementation to be used to glean 
 44  *   input records from the logical <code>InputSplit</code> for processing by  
 45  *   the {@link Mapper}. 
 46  *   </li> 
 47  * </ol> 
 48  *  
 49  * <p>The default behavior of file-based {@link InputFormat}s, typically  
 50  * sub-classes of {@link FileInputFormat}, is to split the  
 51  * input into <i>logical</i> {@link InputSplit}s based on the total size, in  
 52  * bytes, of the input files. However, the {@link FileSystem} blocksize of   
 53  * the input files is treated as an upper bound for input splits. A lower bound  
 54  * on the split size can be set via  
 55  * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.input.fileinputformat.split.minsize"> 
 56  * mapreduce.input.fileinputformat.split.minsize</a>.</p> 
 57  *  
 58  * <p>Clearly, logical splits based on input-size is insufficient for many  
 59  * applications since record boundaries are to respected. In such cases, the 
 60  * application has to also implement a {@link RecordReader} on whom lies the 
 61  * responsibility to respect record-boundaries and present a record-oriented 
 62  * view of the logical <code>InputSplit</code> to the individual task. 
 63  * 
 64  * @see InputSplit 
 65  * @see RecordReader 
 66  * @see FileInputFormat 
 67  */ 
 68 @InterfaceAudience.Public 
 69 @InterfaceStability.Stable 
 70 public abstract class InputFormat<K, V> { 
 71  
 72   /**  
 73    * Logically split the set of input files for the job.   
 74    *  
 75    * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper} 
 76    * for processing.</p> 
 77    * 
 78    * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the 
 79    * input files are not physically split into chunks. For e.g. a split could 
 80    * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat 
 81    * also creates the {@link RecordReader} to read the {@link InputSplit}. 
 82    *  
 83    * @param context job configuration. 
 84    * @return an array of {@link InputSplit}s for the job. 
 85    */ 
 86   public abstract  
 87     List<InputSplit> getSplits(JobContext context 
 88                                ) throws IOException, InterruptedException; 
 89    
 90   /** 
 91    * Create a record reader for a given split. The framework will call 
 92    * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before 
 93    * the split is used. 
 94    * @param split the split to be read 
 95    * @param context the information about the task 
 96    * @return a new record reader 
 97    * @throws IOException 
 98    * @throws InterruptedException 
 99    */ 
100   public abstract  
101     RecordReader<K,V> createRecordReader(InputSplit split, 
102                                          TaskAttemptContext context 
103                                         ) throws IOException,  
104                                                  InterruptedException; 
105  
106 }

InputFormat源码

   * <p>Each [email protected] InputSplit} is then assigned to an individual [email protected] Mapper}    * for processing.</p>

*
* <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
* input files are not physically split into chunks. For e.g. a split could
* be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
* also creates the [email protected] RecordReader} to read the [email protected] InputSplit}.
*
* @param context job configuration.
* @return an array of [email protected] InputSplit}s for the job.
*/
public abstract
                            List<InputSplit> getSplits(JobContext context
                   ) throws IOException, InterruptedException;

  意思是:每一个文件逻辑上切分成若干个split(由getsplit方法),一个split对应一个mapper任务

 /** 
   * Create a record reader for a given split. The framework will call 
   * [email protected] RecordReader#initialize(InputSplit, TaskAttemptContext)} before 
   * the split is used. 
   * @param split the split to be read 
   * @param context the information about the task 
   * @return a new record reader 
   * @throws IOException 
   * @throws InterruptedException 
   */ 
  public abstract  
    RecordReader<K,V> createRecordReader(InputSplit split, 
                                         TaskAttemptContext context 
                                        ) throws IOException,  
                                                 InterruptedException; 
 
} 

  意思是:split本质上是文件内容一部分,由RecordReader来处理文件内容(键值对),进入RecordReader查看,可得该抽象类将data数据拆分成键值对,目的是输入给Mapper

/**
* The record reader breaks the data into key/value pairs for input to the
* [email protected] Mapper}.
* @param <KEYIN>
* @param <VALUEIN>
*/

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable { 
 
  /** 
   * Called once at initialization. 
   * @param split the split that defines the range of records to read 
   * @param context the information about the task 
   * @throws IOException 
   * @throws InterruptedException 
   */ 

  由此总结,源码分析的

                  文件———–通过———–>getsplits()———–分解为————>InputSplit————通过————–>RecordReader类(由createRecordReader()方法创建的)-——处理———->map(k1,v1)

第一部分:文件切分  

   问题1:如何将文件切分成split,查看自雷的getsplits()方法

 1  /**  
 2    * Generate the list of files and make them into FileSplits. 
 3    * @param job the job context 
 4    * @throws IOException 
 5    */ 
 6   public List<InputSplit> getSplits(JobContext job) throws IOException { 
 7     Stopwatch sw = new Stopwatch().start(); 
 8     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); 
 9     long maxSize = getMaxSplitSize(job); 
10  
11     // generate splits 
12     List<InputSplit> splits = new ArrayList<InputSplit>(); 
13     List<FileStatus> files = listStatus(job); 
14     for (FileStatus file: files) { 
15       Path path = file.getPath(); 
16       long length = file.getLen(); 
17       if (length != 0) { 
18         BlockLocation[] blkLocations; 
19         if (file instanceof LocatedFileStatus) { 
20           blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 
21         } else { 
22           FileSystem fs = path.getFileSystem(job.getConfiguration()); 
23           blkLocations = fs.getFileBlockLocations(file, 0, length); 
24         } 
25         if (isSplitable(job, path)) { 
26           long blockSize = file.getBlockSize(); 
27           long splitSize = computeSplitSize(blockSize, minSize, maxSize); 
28  
29           long bytesRemaining = length; 
30           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 
31             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 
32             splits.add(makeSplit(path, length-bytesRemaining, splitSize, 
33                         blkLocations[blkIndex].getHosts(), 
34                         blkLocations[blkIndex].getCachedHosts())); 
35             bytesRemaining -= splitSize; 
36           } 
37  
38           if (bytesRemaining != 0) { 
39             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 
40             splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 
41                        blkLocations[blkIndex].getHosts(), 
42                        blkLocations[blkIndex].getCachedHosts())); 
43           } 
44         } else { // not splitable 
45           splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), 
46                       blkLocations[0].getCachedHosts())); 
47         } 
48       } else {  
49         //Create empty hosts array for zero length files 
50         splits.add(makeSplit(path, 0, length, new String[0])); 
51       } 
52     } 
53     // Save the number of input files for metrics/loadgen 
54     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); 
55     sw.stop(); 
56     if (LOG.isDebugEnabled()) { 
57       LOG.debug("Total # of splits generated by getSplits: " + splits.size() 
58           + ", TimeTaken: " + sw.elapsedMillis()); 
59     } 
60     return splits; 
61   }

getSplits

  /**     * Generate the list of files and make them into FileSplits.      将文件切分成split    * @param job the job context    * @throws IOException    */ 

(1) 通过add方法将切片加入列表

MapReduce部分源码解读(一)详解大数据

(2)add方法中通过makesplit方法实现逻辑块的切分

MapReduce部分源码解读(一)详解大数据

(3)makeSplit内部使用FileSplit进行文件切分

MapReduce部分源码解读(一)详解大数据

(4)FileSplit三个参数的意义如下

MapReduce部分源码解读(一)详解大数据

hosts值得是包含块的节点列表,即block块。从start开始处理,处理多长length,处理的数据信息位于那个block块上,因此Split是逻辑切分

没有真正切分,如此对程序的影响,不会真正去读磁盘数据,而是使用HDFS读数据方法。

 (5)分析文件长度不为0程序如何执行

if (length != 0) { 
        BlockLocation[] blkLocations; 
        if (file instanceof LocatedFileStatus) { 
          blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 
        } else { 
          FileSystem fs = path.getFileSystem(job.getConfiguration()); 
          blkLocations = fs.getFileBlockLocations(file, 0, length); 
        } 
        if (isSplitable(job, path)) { //如果文件被切分,并非所有文件 都可以切分,比如密码文件,通常有文件结构决定是否可以被切分 
          long blockSize = file.getBlockSize(); 
          long splitSize = computeSplitSize(blockSize, minSize, maxSize); 
 
          long bytesRemaining = length; 
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 
            splits.add(makeSplit(path, length-bytesRemaining, splitSize, //length-bytesRemaining为剩余字节
                       blkLocations[blkIndex].getHosts(), 
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining
-= splitSize; }
  1. 如果文件大小300,length=300,bytesRemaining=300
  2. 执行第一次makesplit(0,128)  按splitSize=128切分
  3. bytesRemaining=300-128=172
  4. 执行第二次makesplit(300-172=128,128)
  5. bytesRemaining=172-128=44
  6. 执行第三次makesplit(300-44=256,128)

文件不允许被分割,执行以下程序

} else { // not splitable 
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), 
                      blkLocations[0].getCachedHosts())); 
        } 

  (6)分析文件块大小

380   long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  //结果分析为1L
381   long maxSize = getMaxSplitSize(job);//最大为Long的最大值

MapReduce部分源码解读(一)详解大数据

    查看變量minsize的源代碼  

   long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); 

  点击getFormatMinSplitSize()查看,为1L

  /** 
   * Get the lower bound on split size imposed by the format. 
   * @return the number of bytes of the minimal split for this format 
   */ 
  protected long getFormatMinSplitSize() { 
    return 1; 
  } 

  点击getMinSplitSize()查看,计算办法为返回当前文件块的最小尺寸,如果配置文件中没有SPLIT_MINSIZE参数则返回1L

  /** 
   * Get the minimum split size 
   * @param job the job 
   * @return the minimum number of bytes that can be in a split 
   */ 
  public static long getMinSplitSize(JobContext job) { 
    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); 
  } 

  从而得到minsplit最小值为1L

而真正计算block是在399行,默认情况下inputsplit和block的大小均为128M,换句话说,一个map处理数据块的大小是一个block块大小

397        if (isSplitable(job, path)) { 
398          long blockSize = file.getBlockSize(); 
399          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

     当inputsplit和block的大小不同的时候,就会产生网络传输,如果inputsplit比block大,则inputsplit所需的是一个block块是不够的的,必须在找一个block块。

如果inputsplit比block小,block块中得一部分数据是没有被处理的,可能被别的map处理,也就可鞥产生网络传输,也是一种数据本地化的。

    因为源码中使用的是for循环,因此,没一个文件都会去切分split

    MapReduce部分源码解读(一)详解大数据

 

 (7)两个50M,一个200M的文件和一个空文件会产生几个split

     空白文件也会产生split,两个50M产生两个split,一个200M产生2个split,共需要5个map任务

 **********************************************************

第二部分 通过createRecordReader()处理map任务

    

 (1)解读createRecordReader  

/** An {@link InputFormat} for plain text files.  Files are broken into lines. 
 * Either linefeed or carriage-return are used to signal end of line.  Keys are 
 * the position in the file, and values are the line of text.. */ 
@InterfaceAudience.Public 
@InterfaceStability.Stable 
public class TextInputFormat extends FileInputFormat<LongWritable, Text> { 
 
  @Override 
  public RecordReader<LongWritable, Text>  
    createRecordReader(InputSplit split, 
                       TaskAttemptContext context) { 
    String delimiter = context.getConfiguration().get( 
        "textinputformat.record.delimiter"); 
    byte[] recordDelimiterBytes = null; 
    if (null != delimiter) 
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);//delimiter为分隔符,编码格式为utf-8,在解析的时候如果不是这个格式将会出错
return new LineRecordReader(recordDelimiterBytes); }

LineRecordReader为RecordReader的子类

/** 
 * Treats keys as offset in file and value as line.   key为偏移量,value为每一行的值 
 */ 
@InterfaceAudience.LimitedPrivate({"MapReduce", "Pig"}) 
@InterfaceStability.Evolving 
public class LineRecordReader extends RecordReader<LongWritable, Text> { 
  private static final Log LOG = LogFactory.getLog(LineRecordReader.class); 
  public static final String MAX_LINE_LENGTH =  
    "mapreduce.input.linerecordreader.line.maxlength"; 
 
  private long start; 
  private long pos; 
  private long end;

RecordReader类

 /** 
   * Called once at initialization. 
   * @param split the split that defines the range of records to read 
   * @param context the information about the task 
   * @throws IOException 
   * @throws InterruptedException 
   */ 
  public abstract void initialize(InputSplit split,    //初始化,执行一次 
                                  TaskAttemptContext context 
                                  ) throws IOException, InterruptedException; 
 
  /** 
   * Read the next key, value pair. 
   * @return true if a key/value pair was read 
   * @throws IOException 
   * @throws InterruptedException 
   */ 
  public abstract     //读取下一个键值对  这个键值对是map端的k1和v1 
  boolean nextKeyValue() throws IOException, InterruptedException; 
 
  /** 
   * Get the current key   //得到key值 
   * @return the current key or null if there is no current key 
   * @throws IOException 
   * @throws InterruptedException 
   */ 
  public abstract 
  KEYIN getCurrentKey() throws IOException, InterruptedException; 
   
  /** 
   * Get the current value. 
   * @return the object that was read 
   * @throws IOException 
   * @throws InterruptedException 
   */ 
  public abstract  
  VALUEIN getCurrentValue() throws IOException, InterruptedException; 
   
  /** 
   * The current progress of the record reader through its data. 
   * @return a number between 0.0 and 1.0 that is the fraction of the data read 
   * @throws IOException 
   * @throws InterruptedException 
   */ 
  public abstract float getProgress() throws IOException, InterruptedException; 
   
  /** 
   * Close the record reader. 
   */ 
  public abstract void close() throws IOException; 
}

    上述没有key和value的值,这是需要注意的,下面却提供了key和value的get方法,因此key和value在类的字段中存放,

在方法体中对key和value赋值,然后再利用getCurrentkey和getCurrentValue获得key和value。

    比如:while(rs.next()){rs.getLong()}

              Enumeration里面有一个hasMoreElement()方法也是上述情况,hashtable方法,用element做迭代,最后归结到Enumeration

    因此上述程序可理解为:

          while(rr.nextKeyValue()){key=rr.getCurrentKey(),value=rr.getCurrentValue(),map(key,value,context)}

   通过源代码可以验证上述猜想,key和value的类型已经固定,因此在mapreduce中可以省略<k1,v1>不写

          MapReduce部分源码解读(一)详解大数据

         其中SplitLineReader为行读取器。

(2)

   MapReduce部分源码解读(一)详解大数据

       **split.getStart()处理被处理数据的起始位置,和行没有关系

    MapReduce部分源码解读(一)详解大数据

    起始位置赋给了pos当前位置,现在查找netKeyVAalue()方法

    LineRecordReader类中

public boolean nextKeyValue() throws IOException { 
    if (key == null) { 
      key = new LongWritable(); 
    } 
    key.set(pos); 
    if (value == null) { 
      value = new Text(); 
    } 
    int newSize = 0; 
    // We always read one extra line, which lies outside the upper 
    // split limit i.e. (end - 1) 
    while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { 
      if (pos == 0) { 
        newSize = skipUtfByteOrderMark(); 
      } else { 
        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); 
        pos += newSize; 
      } 
 
      if ((newSize == 0) || (newSize < maxLineLength)) { 
        break; 
      } 
 
      // line too long. try again 
      LOG.info("Skipped line of size " + newSize + " at pos " +  
               (pos - newSize)); 
    } 
    if (newSize == 0) { 
      key = null; 
      value = null; 
      return false; 
    } else { 
      return true; 
    } 
  }

如:hello you 

      hello  me

     上述文件中,会被切分成一个split,在这里

      第一次调用nextKeyValue()的时候start=0,value=hello you,end=19,pos=0,key=0,newsize=10

      第二次调用nextKeyValue()的时候key=10,value=hello me,newsize=10

     由readLine()方法从输入流中读取给定文本,返回值为被读取字节的数量

      newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));读取一行数据,将数据放入value中,返回值为被读取字节的长度,还包括新行(换行)

     总结,key和value的值就是通过nextKeyValue()方法赋值的。

第三部分 当key,value被赋值之后,剩下的问题就是如何被map函数所调用?

 *****************************************************************

从map类分析:   

/** 
 * Licensed to the Apache Software Foundation (ASF) under one 
 * or more contributor license agreements.  See the NOTICE file 
 * distributed with this work for additional information 
 * regarding copyright ownership.  The ASF licenses this file 
 * to you under the Apache License, Version 2.0 (the 
 * "License"); you may not use this file except in compliance 
 * with the License.  You may obtain a copy of the License at 
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0 
 * 
 * Unless required by applicable law or agreed to in writing, software 
 * distributed under the License is distributed on an "AS IS" BASIS, 
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 * See the License for the specific language governing permissions and 
 * limitations under the License. 
 */ 
 
package org.apache.hadoop.mapreduce; 
 
import java.io.IOException; 
 
import org.apache.hadoop.classification.InterfaceAudience; 
import org.apache.hadoop.classification.InterfaceStability; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.io.RawComparator; 
import org.apache.hadoop.io.compress.CompressionCodec; 
import org.apache.hadoop.mapreduce.task.MapContextImpl; 
 
/**  
 * Maps input key/value pairs to a set of intermediate key/value pairs.   
 *  
 * <p>Maps are the individual tasks which transform input records into a  
 * intermediate records. The transformed intermediate records need not be of  
 * the same type as the input records. A given input pair may map to zero or  
 * many output pairs.</p>  
 *  
 * <p>The Hadoop Map-Reduce framework spawns one map task for each  
 * {@link InputSplit} generated by the {@link InputFormat} for the job. 
 * <code>Mapper</code> implementations can access the {@link Configuration} for  
 * the job via the {@link JobContext#getConfiguration()}. 
 *  
 * <p>The framework first calls  
 * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by 
 * {@link #map(Object, Object, Context)}  
 * for each key/value pair in the <code>InputSplit</code>. Finally  
 * {@link #cleanup(Context)} is called.</p> 
 *  
 * <p>All intermediate values associated with a given output key are  
 * subsequently grouped by the framework, and passed to a {@link Reducer} to   
 * determine the final output. Users can control the sorting and grouping by  
 * specifying two key {@link RawComparator} classes.</p> 
 * 
 * <p>The <code>Mapper</code> outputs are partitioned per  
 * <code>Reducer</code>. Users can control which keys (and hence records) go to  
 * which <code>Reducer</code> by implementing a custom {@link Partitioner}. 
 *  
 * <p>Users can optionally specify a <code>combiner</code>, via  
 * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the  
 * intermediate outputs, which helps to cut down the amount of data transferred  
 * from the <code>Mapper</code> to the <code>Reducer</code>. 
 *  
 * <p>Applications can specify if and how the intermediate 
 * outputs are to be compressed and which {@link CompressionCodec}s are to be 
 * used via the <code>Configuration</code>.</p> 
 *   
 * <p>If the job has zero 
 * reduces then the output of the <code>Mapper</code> is directly written 
 * to the {@link OutputFormat} without sorting by keys.</p> 
 *  
 * <p>Example:</p> 
 * <p><blockquote>
 
 * public class TokenCounterMapper  
 *     extends Mapper&lt;Object, Text, Text, IntWritable&gt;{ 
 *     
 *   private final static IntWritable one = new IntWritable(1); 
 *   private Text word = new Text(); 
 *    
 *   public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
 *     StringTokenizer itr = new StringTokenizer(value.toString()); 
 *     while (itr.hasMoreTokens()) { 
 *       word.set(itr.nextToken()); 
 *       context.write(word, one); 
 *     } 
 *   } 
 * } 
 * 

</blockquote></p>
*
* <p>Applications may override the {@link #run(Context)} method to exert
* greater control on map processing e.g. multi-threaded <code>Mapper</code>s
* etc.</p>
*
*
@see InputFormat
*
@see JobContext
*
@see Partitioner
*
@see Reducer
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

/**
* The <code>Context</code> passed on to the {
@link Mapper} implementations.
*/
public abstract class Context
implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}

/**
* Called once at the beginning of the task.任务执行开始调用
*/
protected void setup(Context context
)
throws IOException, InterruptedException {
// NOTHING
}

/**input split的每一个键值对都调用一次
* Called once for each key/value pair in the input split. Most applications
* should override this, but the default is the identity function.
*/
@SuppressWarnings(
"unchecked")
protected void map(KEYIN key, VALUEIN value,
Context context)
throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}

/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
)
throws IOException, InterruptedException {
// NOTHING
}

/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.
*
@param context
*
@throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context); 执行一次
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);//一个inputsplit调用一次map函数
}
}
finally {
cleanup(context); 执行一次
}
}
}

nextKeyValue()方法查看,
MapReduce部分源码解读(一)详解大数据

   找到nextkeyvalue的实现,ctrl+t也可进入

    MapReduce部分源码解读(一)详解大数据

MapContextImpl类下面提供了该类的实现

MapReduce部分源码解读(一)详解大数据

其种reader由RecordReader类提供

MapReduce部分源码解读(一)详解大数据

 

 另外通过Context源码查看

MapReduce部分源码解读(一)详解大数据

进入该类

MapReduce部分源码解读(一)详解大数据

MapReduce部分源码解读(一)详解大数据

研究mapcontext,mapcontext是在构造函数中赋值的

 MapReduce部分源码解读(一)详解大数据

查看WrappedMapper類的nextkeyvalue()方法

MapReduce部分源码解读(一)详解大数据

通過查看其實現,可以查找其實鮮類

MapReduce部分源码解读(一)详解大数据

可觀察到以下reader的實現情況

MapReduce部分源码解读(一)详解大数据

reader最終是有RecordReader來聲明的。

MapReduce部分源码解读(一)详解大数据

總結:

调用MapContextImpl有参构造方法,然后将RecordReader赋值进去(57行),从而可以调用80行的nextkeyvalue()方法,然后MapContextImpl的父类Context调用nextkeyvalue()

 总结:从源代码的角度分析map函数处理的<k1,v1>是如何从HDFS文件中获取的?答:

1.从TextInputFormat入手分析,找到父类FileInputFormat,找到父类InputFormat。
在InputFormat中找到2个方法,分别是getSplits(…)和createRecordReader(…)。
通过注释知道getSplits(…)作用是把输入文件集合中的所有内容解析成一个个的InputSplits,每一个InputSplit对应一个mapper task。
createRecordReader(…)作用是创建一个RecordReader的实现类。RecordReader作用是解析InputSplit产生一个个的<k,v>。
2.在FileInputFormat中找到getSplits(…)的实现。
通过实现,获知
(1)每个SplitSize的大小和默认的block大小一致,好处是满足数据本地性。
(2)每个输入文件都会产生一个InputSplit,即使是空白文件,也会产生InputSPlit;
如果一个文件非常大,那么会按照InputSplit大小,切分产生多个InputSplit。
3.在TextInputFormat中找到createRecordReader(…)的实现,在方法中找到了LineRecordReader。
接下来分析LineRecordReader类。
在RecordReader类中,通过查看多个方法,知晓key、value作为类的属性存在的,且知道了nextKeyValue()方法的用法。
在LineRecordReader类中,重点分析了nextKeyValue(…)方法。在这个方法中,重点分析了newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
在in.readLine(…)中,第一个形参存储被读取的行文本内容,返回值表示被读取内容的字节数。
通过以上代码,分析了InputSplit中的内容是如何转化为一个个的<k,v>。
4.从Mapper类中进行分析,发现了setup()、cleanup()、map()、run()。
在run()方法中,通过while,调用context.nextKeyValue(…)。
进一步分析Context的接口类是org.apache.hadoop.mapreduce.lib.map.WrappedMapper.MapContext,MapContext调用了nextKeyValue(…)。最终找到了MapContext的实现了MapContextImpl类org.apache.hadoop.mapreduce.task.MapContextImpl。
在这个类的构造方法中,发现传入了RecordReader的实现类。

 

 

 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9094.html

(0)
上一篇 2021年7月19日 09:11
下一篇 2021年7月19日 09:11

相关推荐

发表回复

登录后才能评论