Mapreduce中:
shuffle阶段是在map和reduce之间,可以自定义排序,自定义分区和自定义分组!
Mapreduce中,map出的数据是键值对,默认的是hashPatitionner来对map出的数据进行分区;
分区的方法还有其他几个:
RandomSampler<Text, Text> sampler = new InputSampler.RandomSampler<Text, Text>(0.5, 3000, 10); IntervalSampler<Text, Text> sampler2 = new InputSampler.IntervalSampler<Text, Text>(0.333, 10); SplitSampler<Text, Text> sampler3 = new InputSampler.SplitSampler<Text, Text>(reduceNumber);
实现和细节
public class TotalSortMR { @SuppressWarnings("deprecation") public static int runTotalSortJob(String[] args) throws Exception { Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); Path partitionFile = new Path(args[2]); int reduceNumber = Integer.parseInt(args[3]); //三种采样器 RandomSampler<Text, Text> sampler = new InputSampler.RandomSampler<Text, Text>(1, 3000, 10); IntervalSampler<Text, Text> sampler2 = new InputSampler.IntervalSampler<Text, Text>(0.333, 10); SplitSampler<Text, Text> sampler3 = new InputSampler.SplitSampler<Text, Text>(reduceNumber); //任务初始化 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJobName("Total-Sort"); job.setJarByClass(TotalSortMR.class); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(reduceNumber); //设置所有的分区类 job.setPartitionerClass(TotalOrderPartitioner.class); //分区类参考的分区文件 TotalOrderPartitioner.setPartitionFile(conf, partitionFile); //分区使用哪种采样器 InputSampler.writePartitionFile(job, sampler); //job的输入和输出路径 FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); outputPath.getFileSystem(conf).delete(outputPath, true); return job.waitForCompletion(true)? 0 : 1; } public static void main(String[] args) throws Exception{ System.exit(runTotalSortJob(args)); } }
job默认的输入格式是TextInputFormat,这个是key-value的形式,key是每行的行标,value是每行的内容。可以更改
job.setInputFormatClass(,....)
一般要设置mapper的输出格式,以备后面使用。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/opensource/193216.html