问题分析
求全体员工总收入降序排列,获得所有员工总收入并降序排列即可。在Mapper阶段输出所有员工总工资数据,其中key为员工总工资、value为员工姓名,在Mapper阶段的最后会先调用job.setPartitionerClass对数据进行分区,每个分区映射到一个reducer,每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。由于在本作业中Map的key只有0值,故能实现对所有数据进行排序。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Q9EmpSalarySort extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] kv = value.toString().split(","); int empAllSalary = "".equals(kv[6]) ? Integer.parseInt(kv[5]) : Integer.parseInt(kv[5]) + Integer.parseInt(kv[6]); context.write(new IntWritable(empAllSalary), new Text(kv[1])); } } public static class DecreaseComparator extends IntWritable.Comparator { public int compare(WritableComparable a, WritableComparable b) { return -super.compare(a, b); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } } @Override public int run(String[] args) throws Exception { Job job = new Job(getConf(), "Q9EmpSalarySort"); job.setJobName("Q9EmpSalarySort"); job.setJarByClass(Q9EmpSalarySort.class); job.setMapperClass(MapClass.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setSortComparatorClass(DecreaseComparator.class); // 排序规则:倒排 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Q9EmpSalarySort(), args); System.exit(res); } }
用于计算的基础数据请参考:http://blog.ytso.com/post/17840.html
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9816.html