一、MapJoin-DistributedCache 应用
1、mapreduce join 介绍
在各种实际业务场景中,按照某个关键字对两份数据进行连接是非常常见的。如果两份数据 都比较小,那么可以直接在内存中完成连接。如果是大数据量的呢? 显然,在内存中进行连 接会发生 OOM。 MapReduce 可以用来解决大数据量的链接
MapReduce 的 Join 操作主要分两类: MapJoin 和 ReduceJoin
先看 ReduceJoin:
(1)map 阶段,两份数据 data1 和 data2 会被 map 分别读入,解析成以链接字段为 key 以查 询字段为 value 的 key-value 对,并标明数据来源是 data1 还是 data2。
(2)reduce 阶段, reducetask 会接收来自 data1 和 data2 的相同 key 的数据,在 reduce 端进 行乘积链接, 最直接的影响是很消耗内存,导致 OOM
再看 MapJoin:
MapJoin 适用于有一份数据较小的连接情况。做法是直接把该小份数据直接全部加载到内存 当中,按链接关键字建立索引。 然后大份数据就作为 MapTask 的输入,对 map()方法的每次 输入都去内存当中直接去匹配连接。 然后把连接结果按 key 输出,这种方法要使用 hadoop
中的 DistributedCache 把小份数据分布到各个计算节点,每个 maptask 执行任务的节点都需 要加载该数据到内存,并且按连接关键字建立索引
(map读的是大表数据,在读大表之前,把小表数据放到内存当中,用setup方法)
2、需求
现有两份数据 movies.dat 和 ratings.dat 数据样式分别为:
Movies.dat:
1::Toy Story (1995)::Animation|Children’s|Comedy
2::Jumanji (1995)::Adventure|Children’s|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
字段含义: movieid, moviename, movietype
Ratings.dat
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
字段含义: userid, movieid, rate, timestamp
现要求对两表进行连接,要求输出最终的结果有以上六个字段:
movieid, userid, rate, moviename, movietype, timestamp
3、实现
第一步:封装 MovieRate,方便数据的排序和序列化
package com.ghgj.mr.mymapjoin; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class MovieRate implements WritableComparable<MovieRate>{ private String movieid; private String userid; private int rate; private String movieName; private String movieType; private long ts; public String getMovieid() { return movieid; } public void setMovieid(String movieid) { this.movieid = movieid; } public String getUserid() { return userid; } public void setUserid(String userid) { this.userid = userid; } public int getRate() { return rate; } public void setRate(int rate) { this.rate = rate; } public String getMovieName() { return movieName; } public void setMovieName(String movieName) { this.movieName = movieName; } public String getMovieType() { return movieType; } public void setMovieType(String movieType) { this.movieType = movieType; } public long getTs() { return ts; } public void setTs(long ts) { this.ts = ts; } public MovieRate() { } public MovieRate(String movieid, String userid, int rate, String movieName, String movieType, long ts) { this.movieid = movieid; this.userid = userid; this.rate = rate; this.movieName = movieName; this.movieType = movieType; this.ts = ts; } @Override public String toString() { return movieid + "/t" + userid + "/t" + rate + "/t" + movieName + "/t" + movieType + "/t" + ts; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(movieid); out.writeUTF(userid); out.writeInt(rate); out.writeUTF(movieName); out.writeUTF(movieType); out.writeLong(ts); } @Override public void readFields(DataInput in) throws IOException { this.movieid = in.readUTF(); this.userid = in.readUTF(); this.rate = in.readInt(); this.movieName = in.readUTF(); this.movieType = in.readUTF(); this.ts = in.readLong(); } @Override public int compareTo(MovieRate mr) { int it = mr.getMovieid().compareTo(this.movieid); if(it == 0){ return mr.getUserid().compareTo(this.userid); }else{ return it; } } }
第二步:编写mapreduce程序
package com.ghgj.mr.mymapjoin; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MovieRatingMapJoinMR { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop02:9000"); System.setProperty("HADOOP_USER_NAME","hadoop");
Job job = Job.getInstance(conf);
// job.setJarByClass(MovieRatingMapJoinMR.class);
job.setJar(“/home/hadoop/mrmr.jar”);
job.setMapperClass(MovieRatingMapJoinMRMapper.class);
job.setMapOutputKeyClass(MovieRate.class);
job.setMapOutputValueClass(NullWritable.class);
// job.setReducerClass(MovieRatingMapJoinMReducer.class);
// job.setOutputKeyClass(MovieRate.class);
// job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
String minInput = args[0];
String maxInput = args[1];
String output = args[2];
FileInputFormat.setInputPaths(job, new Path(maxInput));
Path outputPath = new Path(output);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outputPath)){
fs.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job, outputPath);
URI uri = new Path(minInput).toUri();
job.addCacheFile(uri);
boolean status = job.waitForCompletion(true);
System.exit(status?0:1);
}
二、自定义 OutputFormat—数据分类输出
实现:自定义 OutputFormat,改写其中的 RecordWriter,改写具体输出数据的方法 write()
package com.ghgj.mr.score_outputformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MyScoreOutputFormat extends TextOutputFormat<Text, NullWritable>{ @Override public RecordWriter<Text, NullWritable> getRecordWriter( TaskAttemptContext job) throws IOException, InterruptedException { Configuration configuration = job.getConfiguration(); FileSystem fs = FileSystem.get(configuration); Path p1 = new Path("/score1/outpu1"); Path p2 = new Path("/score2/outpu2"); if(fs.exists(p1)){ fs.delete(p1, true); } if(fs.exists(p2)){ fs.delete(p2, true); } FSDataOutputStream fsdout1 = fs.create(p1); FSDataOutputStream fsdout2 = fs.create(p2); return new MyRecordWriter(fsdout1, fsdout2); } static class MyRecordWriter extends RecordWriter<Text, NullWritable>{ FSDataOutputStream dout1 = null; FSDataOutputStream dout2 = null; public MyRecordWriter(FSDataOutputStream dout1, FSDataOutputStream dout2) { super(); this.dout1 = dout1; this.dout2 = dout2; } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // TODO Auto-generated method stub String[] strs = key.toString().split("::"); if(strs[0].equals("1")){ dout1.writeBytes(strs[1]+"/n"); }else{ dout2.writeBytes(strs[1]+"/n"); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { IOUtils.closeStream(dout2); IOUtils.closeStream(dout1); } } }
package com.ghgj.mr.score_outputformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ScoreOutputFormatMR extends Configured implements Tool{ // 这个run方法就相当于Driver @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop02:9000"); System.setProperty("HADOOP_USER_NAME", "hadoop"); Job job = Job.getInstance(conf); job.setMapperClass(ScoreOutputFormatMRMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setNumReduceTasks(0); // 这就是默认的输入输出组件 job.setInputFormatClass(TextInputFormat.class); // 这是默认往外输出数据的组件 // job.setOutputFormatClass(TextOutputFormat.class); job.setOutputFormatClass(MyScoreOutputFormat.class); FileInputFormat.setInputPaths(job, new Path("/scorefmt")); Path output = new Path("/scorefmt/output"); FileSystem fs = FileSystem.get(conf); if(fs.exists(output)){ fs.delete(output, true); } FileOutputFormat.setOutputPath(job, output); boolean status = job.waitForCompletion(true); return status?0:1; } public static void main(String[] args) throws Exception { int run = new ToolRunner().run(new ScoreOutputFormatMR(), args); System.exit(run); } static class ScoreOutputFormatMRMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { String[] split = value.toString().split("/t"); if(split.length-2 >= 6){ context.write(new Text("1::"+value.toString()), NullWritable.get()); }else{ context.write(new Text("2::"+value.toString()), NullWritable.get()); } } } }
三、自定义 InputFormat—小文件合并
第一步:自定义InputFormat
package com.ghgj.mr.format.input; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class WholeFileInputFormat extends FileInputFormat<NullWritable, Text> { // 设置每个小文件不可分片,保证一个小文件生成一个key-value键值对 @Override protected boolean isSplitable(JobContext context, Path file) { return false; } @Override public RecordReader<NullWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeFileRecordReader reader = new WholeFileRecordReader(); reader.initialize(split, context); return reader; } }
第二步:编写自定义的 RecordReader
package com.ghgj.mr.format.input; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; class WholeFileRecordReader extends RecordReader<NullWritable, Text> { private FileSplit fileSplit; private Configuration conf; private Text value = new Text(); private boolean processed = false; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.fileSplit = (FileSplit) split; this.conf = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!processed) { // 获取 输入逻辑切片的 字节数组 byte[] contents = new byte[(int) fileSplit.getLength()]; // 通过 filesplit获取该逻辑切片在文件系统的位置 Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream in = null; try { // 文件系统对象fs打开一个file的输入流 in = fs.open(file); /** * in是输入流 * contents是存这个流读取的到数的数据的字节数组 * */ IOUtils.readFully(in, contents, 0, contents.length); value.set(contents, 0, contents.length); } finally { IOUtils.closeStream(in); } processed = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException { return processed ? 1.0f : 0.0f; } @Override public void close() throws IOException { // do nothing } }
第三步:编写mapreduce程序
package com.ghgj.mr.format.input; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SmallFilesConvertToBigMR extends Configured implements Tool { public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SmallFilesConvertToBigMR(), args); System.exit(exitCode); } static class SmallFilesConvertToBigMRMapper extends Mapper<NullWritable, Text, Text, Text> { private Text filenameKey; @Override protected void setup(Context context) throws IOException, InterruptedException { InputSplit split = context.getInputSplit(); Path path = ((FileSplit) split).getPath(); filenameKey = new Text(path.toString()); } @Override protected void map(NullWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(filenameKey, value); } } static class SmallFilesConvertToBigMRReducer extends Reducer<Text, Text, NullWritable, Text> { @Override protected void reduce(Text filename, Iterable<Text> bytes, Context context) throws IOException, InterruptedException { context.write(NullWritable.get(), bytes.iterator().next()); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop02:9000"); System.setProperty("HADOOP_USER_NAME", "hadoop"); Job job = Job.getInstance(conf, "combine small files to bigfile"); job.setJarByClass(SmallFilesConvertToBigMR.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setMapperClass(SmallFilesConvertToBigMRMapper.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setReducerClass(SmallFilesConvertToBigMRReducer.class); // TextInputFormat是默认的数据读取组件 // job.setInputFormatClass(TextInputFormat.class); // 不是用默认的读取数据的Format,我使用自定义的 WholeFileInputFormat job.setInputFormatClass(WholeFileInputFormat.class); Path input = new Path("/smallfiles"); Path output = new Path("/bigfile"); FileInputFormat.setInputPaths(job, input); FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output, true); } FileOutputFormat.setOutputPath(job, output); int status = job.waitForCompletion(true) ? 0 : 1; return status; } }
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/7724.html