MapReduce的典型编程场景1

接下来通过一个实际的案例,介绍在MR编程中的,partition、sort、combiner。

  

流量统计项目案例

数据样本
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4
2052.flash3-http.qq.com 综合门户 15 12 1938 2910 200
字段介绍
MapReduce的典型编程场景1
需求
1、 统计每一个用户(手机号)所耗费的总上行流量、总下行流量,总流量
2、 得出上题结果的基础之上再加一个需求:将统计结果按照总流量倒序排序
3、 将流量汇总统计结果按照手机归属地不同省份输出到不同文件中


  
需求一:统计每一个用户(手机号)所耗费的总上行流量、总下行流量,总流量
  通过需求分析 ,我们可以知道这里可以使用combiner这个优化组件,它的作用是在 maptask之后给 maptask 的结果进行局部汇总,以减轻 reducetask 的计算负载,减少网络传输。
  使用方式:Combiner 和 Reducer 一样,编写一个类,然后继承 Reducer,reduce 方法中写具体的 Combiner逻辑,然后在 job 中设置 Combiner 组件:job.setCombinerClass(FlowSumCombine.class)
代码实现

public class FlowSum {
    //job
    public static void main(String[] args) {
        Configuration conf = new Configuration(true);
        conf.set("fs.defaultFS", "hdfs://zzy:9000");
        conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        try {
            Job job = Job.getInstance(conf);
            job.setJobName("FlowSum");
            //设置任务类
            job.setJarByClass(FlowSum.class);
            //设置Mapper  Reducer  Combine
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            job.setCombinerClass(FlowSumCombine.class);

            //设置map 和reduce 的输入输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            // 指定该 mapreduce 程序数据的输入和输出路径
            Path input=new Path("/data/input");
            Path output =new Path("/data/output");
            //一定要保证output不存在
            if(output.getFileSystem(conf).exists(output)){
                output.getFileSystem(conf).delete(output,true);  //递归删除
            }
            FileInputFormat.addInputPath(job,input);
            FileOutputFormat.setOutputPath(job,output);

            // 最后提交任务
            boolean success = job.waitForCompletion(true);
            System.exit(success?0:-1);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //Mapper
    private class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
        //统计每一个用户(手机号)所耗费的总上行流量、总下行流量,总流量
        //1363157984040
        // 13602846565
        // 5C-0E-8B-8B-B6-00:CMCC
        // 120.197.40.4
        //2052.flash3-http.qq.com
        // 综合门户
        // 15
        // 12
        // 1938  上行流量
        // 2910  下行流量
        // 200
        Text mk=new Text();
        Text mv=new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] fields = value.toString().split("//s+");
            String phone = fields[0];
            String upFlow = fields[8];
            String downFlow = fields[9];
            mk.set(phone);
            mv.set(upFlow+"_"+downFlow);
            context.write(mk,mv);
        }
    }

    //Combiner
    private class FlowSumCombine extends Reducer<Text, Text,Text, Text> {
        Text rv=new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            int upFlowSum=0;
            int downFlowSum=0;
            int  upFlow = 0;
            int downFlow =0;
            for(Text value:values){
                String fields[]=value.toString().split("_");
                upFlow=Integer.parseInt(fields[0]);
                downFlow=Integer.parseInt(fields[1]);
                upFlowSum+=upFlow;
                downFlowSum+=downFlow;
            }
            rv.set(upFlowSum+"_"+downFlowSum);
            context.write(key,rv);
        }
    }

    //Reducer
    private class MyReducer extends Reducer<Text, Text,Text, Text> {
        Text rv=new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            int upFlowSum=0;
            int downFlowSum=0;
            int  upFlow = 0;
            int downFlow =0;
            for(Text value:values){
                String fields[]=value.toString().split("_");
                upFlow=Integer.parseInt(fields[0]);
                downFlow=Integer.parseInt(fields[1]);
                upFlowSum+=upFlow;
                downFlowSum+=downFlow;
            }
            rv.set(upFlowSum+"//t"+downFlowSum);
            context.write(key,rv);
        }
    }
}

  使用 Combiner 注意事项
   - Combiner 的输出 kv 类型应该跟 Reducer 的输入 kv 类型对应起来
   - Combiner 的输入 kv 类型应该跟 Mapper 的输出 kv 类型对应起来
   - Combiner 的使用要非常谨慎,因为 Combiner 在MapReduce 过程中可能调用也可能不调用,可能调一次也可能调多次
   - Combiner 使用的原则是:有或没有都不能影响业务逻辑,都不能影响最终结果

需求二:得出上题结果的基础之上再加一个需求:将统计结果按照总流量倒序排序
   分析:如果是在得出上行流量和下行流量之后,实现倒叙排序呢,之前在java中,如果想让对象按照自定义的规则排序,那么就需要自定义对象并且实现它的比较器。MR也可以,在MR运行过程中,如果有Reducer阶段的话,那么是一定会排序的,根据对象的比较器,进行排序,将排序结果相同的key分到一个reduceTask中。
  实现:这里我们可以使用hadoop自定义WritableComparable来自定义对象,并且实现它的比较器。
代码实现(注意:这里的是对需求一实现之后的结果进行处理的):
自定义对象,实现比较器

public class FlowBean implements WritableComparable<FlowBean> {
    private String phone;
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    // 序列化框架在反序列化操作创建对象实例时会调用无参构造
    public FlowBean(){

    }

    public void set(String phone, long upFlow, long downFlow){
        this.phone=phone;
        this.upFlow=upFlow;
        this.downFlow=downFlow;
        this.sumFlow=upFlow+downFlow;
    }
    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    // 序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.phone);
        out.writeLong(this.upFlow);
        out.writeLong(this.downFlow);
        out.writeLong(this.sumFlow);
    }

    /*
        反序列化方法,这里注意,上面的write中是什么顺写,这里就要什么顺序读取
        要保证字段数据类型一一对应
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.phone=in.readUTF();
        this.upFlow=in.readLong();
        this.downFlow=in.readLong();
        this.sumFlow=in.readLong();
    }
    //这里就是要实现的比较方法
    // 0 表示相等, 1表示大于  负数表示小于
    @Override
    public int compareTo(FlowBean o) {
        //倒叙输出的话就是,参数的属性-类中的属性
        return (int)(o.sumFlow-this.sumFlow);
    }
}

MR程序

public class FlowSumSort {
    //job
    public static void main(String[] args) {
        Configuration conf=new Configuration(true);
        conf.set("fs.defaultFS","hdfs://zzy:9000");
        conf.set("fs.defaultFS", "hdfs://zzy:9000");
        conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        try {
            Job job= Job.getInstance(conf);
            job.setJarByClass(FlowSumSort.class);
            job.setJobName("FlowSumSort");

            job.setMapperClass(Mapper.class);
            job.setReducerClass(Reducer.class);

            job.setOutputKeyClass(FlowBean.class);
            job.setOutputValueClass(NullWritable.class);

            // 指定该 mapreduce 程序数据的输入和输出路径
            Path input=new Path("//data/output");
            Path output =new Path("/data/output1");
            //一定要保证output不存在
            if(output.getFileSystem(conf).exists(output)){
                output.getFileSystem(conf).delete(output,true);  //递归删除
            }
            FileInputFormat.addInputPath(job,input);
            FileOutputFormat.setOutputPath(job,output);

            boolean success=job.waitForCompletion(true);
            System.exit(success?0:1);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //Mapper
    private class MyMapper extends Mapper<LongWritable,Text,FlowBean,NullWritable> {
        FlowBean bean=new FlowBean();
        NullWritable mv=NullWritable.get();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
             String[] fields = value.toString().split("//s+");
             String phone=fields[0];
             long upflow= Long.parseLong(fields[1]);
             long downflow= Long.parseLong(fields[2]);
             bean.set(phone,upflow,downflow);
            context.write(bean,mv);
        }
    }
    //Reducer
    private class MyReducer extends Reducer<FlowBean,NullWritable,FlowBean,NullWritable>{
        @Override
        protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {
            for(NullWritable value:values){
                context.write(key,value);
            }
        }
    }
}

注意:虽然这里reduce阶段只是做了一个输出,没有任何业务操作,但是不能不使用reduce,因为MR的排序就是在MapTask运行完成之后,向reduceTask端输出数据的时候,才会进行排序,如果没有Reduce阶段就不会有排序。

  
需求三:根据归属地输出流量统计数据结果到不同文件,以便于在查询统计结果时可以定位到省级范围进行
分析:默认的在运行MR程序的时候,只运行一个ReducerTask,默认额的一个ReducerTask会有一个输出文件,那么只要自定义分区规则,并且设置好ReducerTask的个数,就可以完成以上需求,默认的MR的分区规则是,key的hashcode%分区数
代码实现
自定义分区

    //这里的两个泛型,是Mpapper端输出的key和value的类型
    private static class MyPartitioner extends Partitioner<Text,FlowBean> {
        //自定义的分区规则
        private static HashMap<String, Integer> provincMap = new HashMap<String, Integer>();
        static {
            provincMap.put("138", 0);
            provincMap.put("139", 1);
            provincMap.put("136", 2);
            provincMap.put("137", 3);
            provincMap.put("135", 4);
        }
        @Override
        public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
            //千万注意,这里的返回值,一定不能大于numPartitions,否则会报错
            Integer code = provincMap.get(text.toString().substring(0, 3));
            if(code!=null){
                return code;
            }
            return 5;
        }
    }

job

            //指定分区规则,和分区个数
            job.setPartitionerClass(MyPartitioner.class);
            job.setNumReduceTasks(5);

注意

  • 在使用分区时,一定注意,分区的返回值一定不能大于设置的reduceTask个数
  • 虽然设置多个ReduceTask可以增加并行度,但是也不需要设置太多,如果某个ReduceTask中没有数据,那么这个ReduceTask就是空跑,浪费资源
  • 尽量的在设置分区时是从0开始的连续的整数

原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/197195.html

(0)
上一篇 2021年11月17日
下一篇 2021年11月17日

相关推荐

发表回复

登录后才能评论