MapReduce的典型编程场景3

1. 自定义InputFormat –数据分类输出

  需求:小文件的合并

  分析:

     – 在数据采集的时候,就将小文件或小批数据合成大文件再上传 HDFS
     – 在业务处理之前,在 HDFS 上使用 MapReduce 程序对小文件进行合并
     – 在 MapReduce 处理时,可采用 CombineFileInputFormat 提高效率

  实现思路:

     – 编写自定义的InoputFormat
     – 改写 RecordReader,实现一次 maptask 读取一个小文件的完整内容封装到一个 KV 对
     – 在Driver 类中一定要设置使用自定义的 InputFormat: job.setInputFormatClass(WholeFileInputFormat.class)


代码实现

public class MergeDriver {
    //job
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
        Job job = null;
        try {
            job = Job.getInstance(conf, "combine small files to bigfile");
            job.setJarByClass(MergeDriver.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);

            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);

            //设置自定义输入的类
            job.setInputFormatClass(MyMyFileInputForamt.class);

            Path input = new Path("/hadoop/input/num_add");
            Path output = new Path("/hadoop/output/merge_output1");

            //这里使用自定义得我FileInputForamt去格式化input
            MyMyFileInputForamt.addInputPath(job,input);
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(output)) {
                fs.delete(output, true);
            }
            FileOutputFormat.setOutputPath(job, output);

            int status = job.waitForCompletion(true) ? 0 : 1;
            System.exit(status);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //Mapper
    static private class MyMapper extends Mapper<NullWritable, Text, NullWritable, Text> {
        /*
            这里的map方法就是每读取一个文件调用一次
         */
        @Override
        protected void map(NullWritable key, Text value, Mapper<NullWritable, Text, NullWritable, Text>.Context context)
                throws IOException, InterruptedException {
            context.write(key, value);
        }
    }
    //Reducer
    private static class MyReducer extends Reducer<NullWritable, Text, NullWritable, Text> {
        @Override
        protected void reduce(NullWritable key, Iterable<Text> values,
                              Reducer<NullWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
            for (Text v : values) {
                context.write(key, v);
            }
        }
    }
    //RecordReader ,这种这个两个泛型,是map端输入的key和value的类型
    private static class MyRecordReader extends RecordReader<NullWritable, Text> {

        // 输出的value对象
        Text map_value = new Text();

        // 文件系统对象,用于获取文件的输入流
        FileSystem fs;

        // 判断当前文件是否已经读完
        Boolean isReader = false;

        //文件的切片信息
        FileSplit fileSplit;

        //初始化方法,类似于Mapper中的setup,整个类最开始运行
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            //初始化文件系统对象
            fs = FileSystem.get(context.getConfiguration());
            //获取文件路径
            fileSplit = (FileSplit) split;
        }

        //这个方法,在每次调用map中传入的K-V中,就是在这个方法中给K-V赋值的
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            //先读取一次
            if (!isReader) {
                FSDataInputStream input = fs.open(fileSplit.getPath());
                //一次性将整个小文件内容都读取出来
                byte flush[] = new byte[(int) fileSplit.getLength()];
                //将文件内容读取到这个byte数组中
                /**
                 * 参数一:读取的字节数组
                 * 参数二:开始读取的偏移量
                 * 参数三:读取的长度
                 */
                input.readFully(flush, 0, (int) fileSplit.getLength());
                isReader = true;
                map_value.set(flush);  //将读取的内容,放置在map的value中
                //保证能正好读一次,nextKeyValue()第一次返回true正好可以调用一次map,第二次返回false
                return isReader;
            }
            return false;
        }
        @Override
        public NullWritable getCurrentKey() throws IOException, InterruptedException {
            return NullWritable.get();
        }
        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return map_value;
        }
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return 0;
        }
        @Override
        public void close() throws IOException {
            fs.close();
        }
    }
    //FileInputFormat
    private static class MyMyFileInputForamt extends FileInputFormat<NullWritable, Text> {
        @Override
        public RecordReader<NullWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            MyRecordReader mr = new MyRecordReader();
            //先调用初始化方法
            mr.initialize(split, context);
            return mr;
        }
    }
}

2. 自定义OutputFormat

  需求:一些原始日志需要做增强解析处理,流程

     – 从原始日志文件中读取数据
     – 根据业务获取业务数据库中的数据
     – 根据某个连接条件获取相应的连接结果

  分析:

     – 在 MapReduce 中访问外部资源
     – 在业务处理之前,在 HDFS 上使用 MapReduce 程序对小文件进行合并
     – 自定义 OutputFormat,改写其中的 RecordWriter,改写具体输出数据的方法 write() CombineFileInputFormat 提高效率


代码实现
//这里以一个简单的案例为例,将文件按照不同的等级输出的不同的文件中

 public class Score_DiffDic {
    //job
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
        Job job = null;
        try {
            job = Job.getInstance(conf, "Score_DiffDic");
            job.setJarByClass(Score_DiffDic.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            //设置自定义输出类型
            job.setOutputFormatClass(MyOutputFormat.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(DoubleWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DoubleWritable.class);

            Path input = new Path("/hadoop/input/num_add");
            FileInputFormat.addInputPath(job,input);
            Path output = new Path("/hadoop/output/merge_output1");
            //这是自定义输出类型
            MyOutputFormat.setOutputPath(job,output);
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(output)) {
                fs.delete(output, true);
            }
            FileOutputFormat.setOutputPath(job, output);

            int status = job.waitForCompletion(true) ? 0 : 1;
            System.exit(status);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //Mapper
    private static class MyMapper extends Mapper<LongWritable,Text,Text,DoubleWritable>{
        Text mk=new Text();
        DoubleWritable mv=new DoubleWritable();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
             String[] fields = value.toString().split("//s+");
            //computer,huangxiaoming,85
            if(fields.length==3){
                mk.set(fields[1]);
                mv.set(Double.parseDouble(fields[2]));
                context.write(mk, mv);
            }
        }
    }
    //Reducer
    private static class MyReducer extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{
        DoubleWritable mv=new DoubleWritable();
        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values, Context context)
                throws IOException, InterruptedException {
            double  sum=0;
            int count=0;
            for(DoubleWritable value:values){
                sum+=value.get();
                count++;
            }
            mv.set(sum/count);
            context.write(key,mv);
        }
    }

    //FileOutputFormat
    private static class MyOutputFormat extends FileOutputFormat<Text, DoubleWritable> {
        @Override
        public RecordWriter<Text, DoubleWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
            FileSystem fs =FileSystem.get(job.getConfiguration());
            return new MyRecordWrite(fs);
        }
    }

    //RecordWriter,这里的两个泛型是Reudcer输出K-V的类型
    private static class MyRecordWrite extends RecordWriter<Text, DoubleWritable> {
        FileSystem fs;
        //输出的文件的路径
        Path path2 = new Path("/hadoop/output/score_out1");
        Path path3 = new Path("/hadoop/output/score_out2");
        FSDataOutputStream output1;
        FSDataOutputStream output2;

        public MyRecordWrite() {

        }
        //初始化参数
        public MyRecordWrite(FileSystem fs) {
            this.fs = fs;
            try {
                output1=fs.create(path2);
                output2=fs.create(path3);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void write(Text key, DoubleWritable value) throws IOException, InterruptedException {
            //业务逻辑操作,平均分数大于80的在path2中,其他的在path3中
            if(value.get()>80){
                output1.write((key.toString()+":"+value.get()+"/n").getBytes());
            }else{
                output2.write((key.toString()+":"+value.get()+"/n").getBytes());
            }
        }
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            fs.close();
            output1.close();
            output2.close();
        }
    }
}

原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/192749.html

(0)
上一篇 2021年11月15日
下一篇 2021年11月15日

相关推荐

发表回复

登录后才能评论