1. 自定义InputFormat –数据分类输出
需求:小文件的合并
分析:
– 在数据采集的时候,就将小文件或小批数据合成大文件再上传 HDFS
– 在业务处理之前,在 HDFS 上使用 MapReduce 程序对小文件进行合并
– 在 MapReduce 处理时,可采用 CombineFileInputFormat 提高效率
实现思路:
– 编写自定义的InoputFormat
– 改写 RecordReader,实现一次 maptask 读取一个小文件的完整内容封装到一个 KV 对
– 在Driver 类中一定要设置使用自定义的 InputFormat: job.setInputFormatClass(WholeFileInputFormat.class)
代码实现:
public class MergeDriver {
//job
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
Job job = null;
try {
job = Job.getInstance(conf, "combine small files to bigfile");
job.setJarByClass(MergeDriver.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
//设置自定义输入的类
job.setInputFormatClass(MyMyFileInputForamt.class);
Path input = new Path("/hadoop/input/num_add");
Path output = new Path("/hadoop/output/merge_output1");
//这里使用自定义得我FileInputForamt去格式化input
MyMyFileInputForamt.addInputPath(job,input);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(output)) {
fs.delete(output, true);
}
FileOutputFormat.setOutputPath(job, output);
int status = job.waitForCompletion(true) ? 0 : 1;
System.exit(status);
} catch (Exception e) {
e.printStackTrace();
}
}
//Mapper
static private class MyMapper extends Mapper<NullWritable, Text, NullWritable, Text> {
/*
这里的map方法就是每读取一个文件调用一次
*/
@Override
protected void map(NullWritable key, Text value, Mapper<NullWritable, Text, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
//Reducer
private static class MyReducer extends Reducer<NullWritable, Text, NullWritable, Text> {
@Override
protected void reduce(NullWritable key, Iterable<Text> values,
Reducer<NullWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
for (Text v : values) {
context.write(key, v);
}
}
}
//RecordReader ,这种这个两个泛型,是map端输入的key和value的类型
private static class MyRecordReader extends RecordReader<NullWritable, Text> {
// 输出的value对象
Text map_value = new Text();
// 文件系统对象,用于获取文件的输入流
FileSystem fs;
// 判断当前文件是否已经读完
Boolean isReader = false;
//文件的切片信息
FileSplit fileSplit;
//初始化方法,类似于Mapper中的setup,整个类最开始运行
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
//初始化文件系统对象
fs = FileSystem.get(context.getConfiguration());
//获取文件路径
fileSplit = (FileSplit) split;
}
//这个方法,在每次调用map中传入的K-V中,就是在这个方法中给K-V赋值的
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
//先读取一次
if (!isReader) {
FSDataInputStream input = fs.open(fileSplit.getPath());
//一次性将整个小文件内容都读取出来
byte flush[] = new byte[(int) fileSplit.getLength()];
//将文件内容读取到这个byte数组中
/**
* 参数一:读取的字节数组
* 参数二:开始读取的偏移量
* 参数三:读取的长度
*/
input.readFully(flush, 0, (int) fileSplit.getLength());
isReader = true;
map_value.set(flush); //将读取的内容,放置在map的value中
//保证能正好读一次,nextKeyValue()第一次返回true正好可以调用一次map,第二次返回false
return isReader;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return map_value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
fs.close();
}
}
//FileInputFormat
private static class MyMyFileInputForamt extends FileInputFormat<NullWritable, Text> {
@Override
public RecordReader<NullWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
MyRecordReader mr = new MyRecordReader();
//先调用初始化方法
mr.initialize(split, context);
return mr;
}
}
}
2. 自定义OutputFormat
需求:一些原始日志需要做增强解析处理,流程
– 从原始日志文件中读取数据
– 根据业务获取业务数据库中的数据
– 根据某个连接条件获取相应的连接结果
分析:
– 在 MapReduce 中访问外部资源
– 在业务处理之前,在 HDFS 上使用 MapReduce 程序对小文件进行合并
– 自定义 OutputFormat,改写其中的 RecordWriter,改写具体输出数据的方法 write() CombineFileInputFormat 提高效率
代码实现
//这里以一个简单的案例为例,将文件按照不同的等级输出的不同的文件中
public class Score_DiffDic {
//job
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
Job job = null;
try {
job = Job.getInstance(conf, "Score_DiffDic");
job.setJarByClass(Score_DiffDic.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//设置自定义输出类型
job.setOutputFormatClass(MyOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
Path input = new Path("/hadoop/input/num_add");
FileInputFormat.addInputPath(job,input);
Path output = new Path("/hadoop/output/merge_output1");
//这是自定义输出类型
MyOutputFormat.setOutputPath(job,output);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(output)) {
fs.delete(output, true);
}
FileOutputFormat.setOutputPath(job, output);
int status = job.waitForCompletion(true) ? 0 : 1;
System.exit(status);
} catch (Exception e) {
e.printStackTrace();
}
}
//Mapper
private static class MyMapper extends Mapper<LongWritable,Text,Text,DoubleWritable>{
Text mk=new Text();
DoubleWritable mv=new DoubleWritable();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split("//s+");
//computer,huangxiaoming,85
if(fields.length==3){
mk.set(fields[1]);
mv.set(Double.parseDouble(fields[2]));
context.write(mk, mv);
}
}
}
//Reducer
private static class MyReducer extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{
DoubleWritable mv=new DoubleWritable();
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
double sum=0;
int count=0;
for(DoubleWritable value:values){
sum+=value.get();
count++;
}
mv.set(sum/count);
context.write(key,mv);
}
}
//FileOutputFormat
private static class MyOutputFormat extends FileOutputFormat<Text, DoubleWritable> {
@Override
public RecordWriter<Text, DoubleWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
FileSystem fs =FileSystem.get(job.getConfiguration());
return new MyRecordWrite(fs);
}
}
//RecordWriter,这里的两个泛型是Reudcer输出K-V的类型
private static class MyRecordWrite extends RecordWriter<Text, DoubleWritable> {
FileSystem fs;
//输出的文件的路径
Path path2 = new Path("/hadoop/output/score_out1");
Path path3 = new Path("/hadoop/output/score_out2");
FSDataOutputStream output1;
FSDataOutputStream output2;
public MyRecordWrite() {
}
//初始化参数
public MyRecordWrite(FileSystem fs) {
this.fs = fs;
try {
output1=fs.create(path2);
output2=fs.create(path3);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, DoubleWritable value) throws IOException, InterruptedException {
//业务逻辑操作,平均分数大于80的在path2中,其他的在path3中
if(value.get()>80){
output1.write((key.toString()+":"+value.get()+"/n").getBytes());
}else{
output2.write((key.toString()+":"+value.get()+"/n").getBytes());
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
fs.close();
output1.close();
output2.close();
}
}
}
原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/tech/opensource/192749.html