一、基本原理
在map执行之前,需要将数据进行切片,每个切片对应一个map任务。而每个map任务并不是直接处理这些切片数据的,它是处理KV的。所以问题有两个:数据是如何切片的、切片是如何转为KV给map处理的。
这就涉及到两个抽象类,InputFormat以及 RecordReader。具体为什么是这两个抽象类,请看之前input的源码分析
1、InputFormat
public abstract class InputFormat<K, V> {
public InputFormat() {
}
public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;
public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}
我们看到,这个抽象类就两个方法
getSplits:看名字就知道是用来将数据处理成切片的了
createRecordReader:就是用来创建RecordReader对象的。
所以这就是一个InputFormat基本的功能
2、 RecordReader
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
public RecordReader() {
}
//初始化,一般就是读取切片的数据
public abstract void initialize(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
//检查是否还有下一对KV,并且如果有,实际上会将其处理成KV,并赋值给this.key和this.value
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
//返回一个key
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
//返回一个value
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
//返回是否在处理
public abstract float getProgress() throws IOException, InterruptedException;
//关闭reader
public abstract void close() throws IOException;
}
这个抽象类就涉及到读取切片的数据,处理成KV结构。而在input源码分析中说到,mapper.run方法中通过 context.getCurrentKey() 类似的方法获取key其实就是调用这个RecordReader中的这些get方法而已。
3、InputFormat以及 RecordReader的关系
从上面的源码可以看到。
InputFormat:负责规划切片信息,以及创建RecordReader对象
RecordReader:负责按照切片规划去读取当前mapper处理的切片数据,并将其处理成KV形式,然后通过context传递给mapper。
二、InputFormat以及 RecordReader常用实现类
常用的有:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat(自定义有另外的文章讲)
1、TextInputFormat
这是默认的InputFormat,切片方式是按数据块的方式切割,默认大小block大小。一个文件至少是一个切片(无论多小)。因为这个类继承FileInputFormat,使用的是其父类定义的getsplit() 方法进行切片。
使用的RecordReader是LineRecordReader。处理切片成KV时,每条记录是一行输入。键K是LongWritable类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不包括任何行终止符(换行符和回车符)。
2、KeyValueTextInputFormat
这个类也是使用父类FileInputFormat的getsplit() 方法进行切片,所以切片方式和上面一致。
使用的RecordReader是KeyValueLineRecordReader。每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");来设定分隔符。默认分隔符是tab(/t)。
3、NLineInputFormat
这个类虽然继承了FileInputFormat,但是自己重写了getSplit方法,使用另外的方式来切片。是按指定的行数来切片,比如5行,那就5行作为一个切片,无论数据大小。通过mapreduce.input.lineinputformat.linespermap 这个参数设置切片行数。
使用的RecordReader是LineRecordReader。和上面类似,不重复说。
4、CombineTextInputFormat
这个类继承于 CombineFileInputFormat,父类继承于FileInputFormat。在CombineFileInputFormat中重写了 getSplits方法。因为FileInputFormat默认无论多小的文件,一个文件至少是一个切片。如果遇到很多小文件,就会导致很多切片。而这里的切片方式就是严格按照大小来切片,会将小文件集合在一起,达到指定大小,才作为一个切片。
使用的RecordReader是CombineFileRecordReader。处理方式和 LineRecordReader类似,只不过切片可能是来自多个文件,读取方式上略显麻烦。
三、设置使用指定的inputformat
job.setInputFormatClass(xxxInputFormat.class);
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/190456.html