问题分析
求工资比上司高的员工姓名及工资,需要得到上司工资及上司所有下属员工,通过比较他们工资高低得到比上司工资高的员工。在Mapper阶段输出经理数据和员工对应经理表数据,其中经理数据key为员工编号、value为”M,该员工工资”,员工对应经理表数据key为经理编号、value为”E,该员工姓名,该员工工资”;然后在Shuffle阶段把传过来的经理数据和员工对应经理表数据进行归组,如编号为7698员工,value中标志M为自己工资,value中标志E为其下属姓名及工资;最后在Reduce中遍历比较员工与经理工资高低,输出工资高于经理的员工。
import java.io.IOException; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Q5EarnMoreThanManager extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] kv = value.toString().split(","); // 输出所有员工的工资键值对,以自己的编号为key,以工资为value context.write(new Text(kv[0].toString()), new Text("M," + kv[5])); // System.out.println("<"+kv[0].toString()+"---"+"M," + kv[5]+">"); if (null != kv[3] && !"".equals(kv[3].toString())) { // 输出有直属上司的雇员的工资,以其上司的编号为key值,自己的工资信息为value context.write(new Text(kv[3].toString()), new Text("E," + kv[1] + "," + kv[5])); // System.out.println("<"+kv[3].toString()+"---"+"E," + kv[1] + "," + kv[5]+">"); } } } public static class Reduce extends Reducer<Text, Text, Text, Text> { // 在同一个reduce任务中 只处理一个键值对,因此在一个reduce中存放的是指定的上司和属于他的直属 下属 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String empName; long empSalary = 0; HashMap<String, Long> empMap = new HashMap<String, Long>(); long mgrSalary = 0; // 将职员和上司的工资信息分开存放,利于比较普通下属与上司的工资比较,找出符合条件的,否则该reduce任务不输出结果 for (Text val : values) { if (val.toString().startsWith("E")) { empName = val.toString().split(",")[1]; empSalary = Long.parseLong(val.toString().split(",")[2]); empMap.put(empName, empSalary); } else { mgrSalary = Long.parseLong(val.toString().split(",")[1]); } } for (java.util.Map.Entry<String, Long> entry : empMap.entrySet()) { if (entry.getValue() > mgrSalary) { context.write(new Text(entry.getKey()), new Text("" + entry.getValue())); } } } } @Override public int run(String[] args) throws Exception { Job job = new Job(getConf(), "Q5EarnMoreThanManager"); job.setJobName("Q5EarnMoreThanManager"); job.setJarByClass(Q5EarnMoreThanManager.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Q5EarnMoreThanManager(), args); System.exit(res); } }
用于计算的基础数据请参考:http://blog.ytso.com/post/17840.html
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/9812.html