问题分析
求工资比公司平均工资要高的员工姓名及工资,需要得到公司的平均工资和所有员工工资,通过比较得出工资比平均工资高的员工姓名及工资。这个问题可以分两个作业进行解决,先求出公司的平均工资,然后与所有员工进行比较得到结果;也可以在一个作业进行解决,这里就得使用作业setNumReduceTasks方法,设置Reduce任务数为1,保证每次运行一个reduce任务,在该例子中,只需要一个reduce任务就可以处理完数据,从而能先求出平均工资,然后进行比较得出结果。
在Mapper阶段输出两份所有员工数据,其中一份key为0、value为该员工工资,另外一份key为1、value为”该员工姓名 ,员工工资”;然后在Shuffle阶段把传过来数据按照key进行归组,在该任务中有key值为0和1两组数据;最后在Reduce中对key值0的所有员工求工资总数和员工数,获得平均工资;对key值1,比较员工与平均工资的大小,输出比平均工资高的员工和对应的工资。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Q6HigherThanAveSalary extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] kv = value.toString().split(","); // 输出<0,每个人的工资> 用于计算平均工资 context.write(new IntWritable(0), new Text(kv[5])); // 输出<1,该员工姓名+员工工资> 用于和平均工资比较 context.write(new IntWritable(1), new Text(kv[1] + "," + kv[5])); } } public static class Reduce extends Reducer<IntWritable, Text, Text, Text> { private long allSalary = 0; private int allEmpCount = 0; private long aveSalary = 0; private long empSalary = 0; public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text val : values) { // 得到工资总数 if (0 == key.get()) { allSalary += Long.parseLong(val.toString()); allEmpCount++; System.out.println("allEmpCount = " + allEmpCount); } else if (1 == key.get()) { if (aveSalary == 0) { aveSalary = allSalary / allEmpCount; // 计算平均工资 context.write(new Text("Average Salary = "), new Text("" + aveSalary)); context.write(new Text("Following employees have salarys higher than Average:"), new Text("")); } // System.out.println("Employee salary = " + val.toString()); // aveSalary = allSalary / allEmpCount; empSalary = Long.parseLong(val.toString().split(",")[1]); if (empSalary > aveSalary) { context.write(new Text(val.toString().split(",")[0]), new Text("" + empSalary)); } } } } } @Override public int run(String[] args) throws Exception { Job job = new Job(getConf(), "Q6HigherThanAveSalary"); job.setJobName("Q6HigherThanAveSalary"); job.setJarByClass(Q6HigherThanAveSalary.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setNumReduceTasks(1); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Q6HigherThanAveSalary(), args); System.exit(res); } }
用于计算的基础数据请参考:http://blog.ytso.com/post/17840.html
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/9813.html