问题分析
MapReduce中的join分为好几种,比如有最常见的 reduce side join、map side join和semi join 等。reduce join 在shuffle阶段要进行大量的数据传输,会造成大量的网络IO效率低下,而map side join 在处理多个小表关联大表时非常有用 。
Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
(2)用户使用:在分布式环境DistributedCache.getLocalCacheFiles()/在伪分布式环境DistributedCache.getCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
在下面代码中,将会把数据量小的表(部门dept)缓存在内存中,在Mapper阶段对员工部门编号映射成部门名称,该名称作为key输出到Reduce中,在Reduce中计算按照部门计算各个部门的总工资。
import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Q1SumDeptSalary extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { // 用于缓存 dept文件中的数据 private Map<String, String> deptMap = new HashMap<String, String>(); private String[] kv; // 此方法会在Map方法执行之前执行且执行一次,【缓存阶段】 @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader in = null; try { // 从当前作业中获取要缓存的文件 URI[] paths = DistributedCache.getCacheFiles(context.getConfiguration()); String deptIdName = null; for (URI path : paths) { // 对部门文件字段进行拆分并缓存到deptMap中 if (path.toString().contains("dept")) { in = new BufferedReader(new FileReader(path.toString())); while (null != (deptIdName = in.readLine())) { // 对部门文件字段进行拆分并缓存到deptMap中 // 其中Map中key为部门编号,value为所在部门名称 deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]); } } } } catch (IOException e) { e.printStackTrace(); } finally { try { if (in != null) { in.close(); } } catch (IOException e) { e.printStackTrace(); } } } // 【map阶段】 输入的文件自动作为map的输入参数,一行为一个key public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 对员工文件字段进行拆分 kv = value.toString().split(","); // map join: 在map阶段过滤掉不需要的数据(保证只有deptMap中存在的部门号), // 在此阶段完成了emp中的部门号和dept中的部门名之间的关联,输出key为部门名称和value为员工工资 if (deptMap.containsKey(kv[7])) { if (null != kv[5] && !"".equals(kv[5].toString())) { context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim())); // System.out.println("<"+deptMap.get(kv[7].trim())+"--"+ // kv[5].trim()+">"); } } } } public static class Reduce extends Reducer<Text, Text, Text, LongWritable> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 将多个相同key的小键值对 在shuffle阶段已经汇聚为一个整体键值对 // 对同一部门的员工工资进行求和 long sumSalary = 0; for (Text val : values) { sumSalary += Long.parseLong(val.toString()); } // 输出key为部门名称和value为该部门员工工资总和 context.write(key, new LongWritable(sumSalary)); // System.out.println("<"+key+" --"+sumSalary+">"); } } @Override public int run(String[] args) throws Exception { // 实例化作业对象,设置作业名称、Mapper和Reduce类 Job job = new Job(getConf(), "Q1SumDeptSalary"); job.setJobName("Q1SumDeptSalary"); job.setJarByClass(Q1SumDeptSalary.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); // 设置输入格式类 job.setInputFormatClass(TextInputFormat.class); // 设置输出格式 job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); DistributedCache.addCacheFile(new URI(otherArgs[0]), job.getConfiguration()); FileInputFormat.addInputPath(job, new Path(otherArgs[1])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } /** * 主方法,执行入口 * * @param args * 输入参数 */ public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Q1SumDeptSalary(), args); System.exit(res); } }
用于计算的基础数据请参考:http://blog.ytso.com/post/17840.html
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9807.html