大数据技术之小文件处理(自定义InputFormat)详解大数据

7.5 小文件处理(自定义InputFormat)

1)需求

无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。将多个小文件合并成一个文件SequenceFile,SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。

2)输入数据

one.txt

yongpeng weidong weinan 
sanfeng luozong xiaoming

two.txt

longlong fanfan 
mazong kailun yuhang yixin 
longlong fanfan 
mazong kailun yuhang yixin

three.txt

shuaige changmo zhenqiang  
dongli lingu xuanxuan

最终预期文件格式:

 

3)分析

小文件的优化无非以下几种方式:

(1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS

(2)在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并

(3)在mapreduce处理时,可采用CombineTextInputFormat提高效率

4)具体实现

本节采用自定义InputFormat的方式,处理输入小文件的问题。

(1)自定义一个类继承FileInputFormat

(2)改写RecordReader,实现一次读取一个完整文件封装为KV

(3)在输出时使用SequenceFileOutPutFormat输出合并文件

5)程序实现:

(1)自定义InputFromat

package com.xyg.mapreduce.inputformat; 
 
import java.io.IOException; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
 
public class WholeFileInputformat extends FileInputFormat<NullWritable, BytesWritable>{ 
 
    @Override 
    protected boolean isSplitable(JobContext context, Path filename) { 
        return false; 
    } 
     
    @Override 
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) 
            throws IOException, InterruptedException { 
        // 1 定义一个自己的recordReader 
        WholeRecordReader recordReader = new WholeRecordReader(); 
         
        // 2 初始化recordReader 
        recordReader.initialize(split, context); 
         
        return recordReader; 
    } 
}

(2)自定义RecordReader

package com.xyg.mapreduce.inputformat; 
 
import java.io.IOException; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.IOUtils; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
 
public class WholeRecordReader extends RecordReader<NullWritable, BytesWritable> { 
    private FileSplit split; 
    private Configuration configuration; 
 
    private BytesWritable value = new BytesWritable(); 
    private boolean processed = false; 
 
    @Override 
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 
        // 获取传递过来的数据 
        this.split = (FileSplit) split; 
        configuration = context.getConfiguration(); 
    } 
 
    @Override 
    public boolean nextKeyValue() throws IOException, InterruptedException { 
         
        if (!processed) { 
            // 1 定义缓存 
            byte[] contents = new byte[(int) split.getLength()]; 
 
            // 2 获取文件系统 
            Path path = split.getPath(); 
            FileSystem fs = path.getFileSystem(configuration); 
 
            // 3 读取内容 
            FSDataInputStream fis = null; 
            try { 
                // 3.1 打开输入流 
                fis = fs.open(path); 
                 
                // 3.2 读取文件内容 
                IOUtils.readFully(fis, contents, 0, contents.length); 
                 
                // 3.3 输出文件内容 
                value.set(contents, 0, contents.length); 
            } catch (Exception e) { 
 
            } finally { 
                IOUtils.closeStream(fis); 
            } 
             
            processed = true; 
             
            return true; 
        } 
         
        return false; 
    } 
 
    @Override 
    public NullWritable getCurrentKey() throws IOException, InterruptedException { 
         
        return NullWritable.get(); 
    } 
 
    @Override 
    public BytesWritable getCurrentValue() throws IOException, InterruptedException { 
         
        return value; 
    } 
 
    @Override 
    public float getProgress() throws IOException, InterruptedException { 
        return processed?1:0; 
    } 
 
    @Override 
    public void close() throws IOException { 
 
    } 
}

(3)InputFormatDriver处理流程

package com.xyg.mapreduce.inputformat; 
 
import java.io.IOException; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 
public class InputFormatDriver { 
 
    static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> { 
        private Text k = new Text();; 
 
        @Override 
        protected void map(NullWritable key, BytesWritable value, Context context) 
                throws IOException, InterruptedException { 
 
            // 获取切片信息 
            InputSplit split = context.getInputSplit(); 
            // 获取切片路径 
            Path path = ((FileSplit) split).getPath(); 
            // 根据切片路径获取文件名称 
k.set(path.toString()); 
 
            // 文件名称为key 
            context.write(k, value); 
        } 
    } 
 
    public static void main(String[] args) throws Exception { 
        args = new String[] { "e:/inputinputformat", "e:/output1" }; 
 
        Configuration conf = new Configuration(); 
         
        Job job = Job.getInstance(conf); 
        job.setJarByClass(InputFormatDriver.class); 
        job.setMapperClass(SequenceFileMapper.class); 
        job.setNumReduceTasks(0); 
        job.setInputFormatClass(WholeFileInputFormat.class); 
        job.setOutputFormatClass(SequenceFileOutputFormat.class); 
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(BytesWritable.class); 
        FileInputFormat.setInputPaths(job, new Path(args[0])); 
        FileOutputFormat.setOutputPath(job, new Path(args[1])); 
 
        boolean result = job.waitForCompletion(true); 
 
        System.exit(result ? 0 : 1); 
    } 
}

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9441.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论