[笔记]MapReduce入门详解大数据

MapReduce原理

分布式运算思想

场景需求

有一个日志文件,记录了一些搜索关键词搜索的记录,xx.log
按照 搜索时间 搜索关键词 ….. 等等属性为规则的一行行排列,使用分布式运行大概有以下几个步骤:
这里写图片描述

  • 1:任务资源的分发(jar文件,运算程序)
  • 1.1:服务器运算资源对各个任务的分配
  • 2:任务在各个节点上设置运行环境,启动执行
  • 3:监视各个节点上任务的执行状态
  • 4:如果有任务失败,还要设法重试
  • 5:中间结果的调度、汇总
    就以上几点来说,如果我们自己实现的话,随便一个实现起来都非常的困难,所以hadoop提供了解决方案:
    1、1.1、2 — yarn 资源调度集群框架(hadoop原生)
    3、4、5 — mapreduce分布式计算模型框架(hadoop原生) 其他的有:spark、storm

mapreduce 官方示例

提示:需要启动分布式中的yarn服务哦

wordcount(统计)

示例需求:在hdfis中某个目录下,有一些日志文件,需要统计该文件中单词出现的次数

我们使用 hadoop提供的示例jar包来运行分析:(首先得模拟一些单词日志文件上传到hdfs上)
参数说明:运行什么例子 需要分析的数据文件/文件夹 统计结果输出目录

hadoop jar /home/hadoop/app/hadoop-2.4.1/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar wordcount /wordCount/data/ /wordCount/output 
 
运行完毕之后,使用fs的查看文本命令查看输出的目录:hadoop fs -cat /wordCount/output/part-r-00000

pi(计算圆周率)

示例需求:计算圆周率

hadoop jar /home/hadoop/app/hadoop-2.4.1/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar pi 10 10

MapReduce Helloworld

wordcount(统计)实现

MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。
这两个函数的形参是key、value对,表示函数的输入信息。

mapper

import java.io.IOException; 
 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 
 
/** 
 *  
 * @ClassName: WordCountMapper  
 * @Description:
  
 * KEYIN, 输入的偏移量  
 * VALUEIN, 输入的(读取)的value(一行文本)  
 * KEYOUT, 输出的key 
 * VALUEOUT,输出的value 
 * 

* @author zq
*
* @version V0.1
* @date 2015年4月12日 下午2:52:54
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
// 偏移量、读取到的文本、处理之后的每个单词、单词出现的次数(输出一次为1)
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" "); // 切分数据
for (String word : words) {
context.write(new Text(word), new LongWritable(1));
}
}
}

reducer

import java.io.IOException; 
 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 
/** 
 *  
 * @ClassName: WordCountReducer  
 * @Description: 
 
 * 参数依次是: 
 * mapper中的输出类型(前两个) 
 * reducer中的输出类型(后两个) 
 * 

* @author zq
*
* @version V0.1
* @date 2015年4月12日 下午7:50:46
*/
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException,
InterruptedException {
//统计工作
long count = 0;
//遍历values,累加到计数器中
for (LongWritable v : values) {
count+=v.get();
}
//输出一个单词key及其总次数
context.write(new Text(key), new LongWritable(count));
}
}

driver运行入口配置信息

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
 
public class WordCountDriver { 
    public static void main(String[] args) throws Exception { 
        Configuration conf = new Configuration(); 
        // 构造一个job对象来封装本mapreduce业务到所有信息 
        Job wcjob = Job.getInstance(conf); 
 
        // 指定本job工作用到的jar包位置 
        wcjob.setJarByClass(WordCountDriver.class); 
        // 指定本job用到的mapper类 
        wcjob.setMapperClass(WordCountMapper.class); 
        // 指定本job用到的reducer类 
        wcjob.setReducerClass(WordCountReducer.class); 
 
        // 指定mapper输出的kv类型 
        wcjob.setMapOutputKeyClass(Text.class); 
        wcjob.setMapOutputValueClass(LongWritable.class); 
 
        // 指定reducer输出到kv数据类型,(setOutputKeyClass 
        // 会对mapper和reducer都起作用,如果上面mapper不设置的话) 
        wcjob.setOutputKeyClass(org.apache.hadoop.io.Text.class); 
        wcjob.setOutputValueClass(LongWritable.class); 
 
        // 指定程序处理到输入数据所在的路径 
        FileInputFormat.setInputPaths(wcjob, new Path("/wordCount/data/")); 
        // 指定程序处理到输出结果所在路径 
        FileOutputFormat.setOutputPath(wcjob, new Path("/wordCount/output/")); 
 
        // 将该job通过yarn的客户端进行提交 
        wcjob.waitForCompletion(true); 
    } 
} 

那么问题来了,怎么运行该程序呢?

方式1:利用hadoop命令提交(集群运行模式)

  • 1:首先把上面写好的程序打成jar包,上传到linux服务器中
    注意:如果上传到hdfs中,运行hdfs中的jar的时候会提示:Not a valid JAR: /wordcount.jar
  • 2:使用hadoop -jar 运行

    hadoop jar /wordCount/jar/WordCount.jar cn.mapperReduce.mr1.WordCountDriver


    注:如果打jar包的时候打成的是runJar,则这里可不用指定入口类

方式2:直接在eclipse中运行driver main函数(local本地运行模式:方便调试)仅限linux下

直接运行上面的代码会抛出错误:

Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: file:/wordCount/data

**需要注意的点有2点:**

  • 输入输出目录,要是hdfs目录
  • 要指定jar包在本机上的位置(我们把jar包打到当前项目classPath下)
//1:指定为hdfs文件系统 
conf.set("fs.defaultFS", "hdfs://192.168.184.141:9000"); 
//2:指定jar包位置 
conf.set("mapreduce.job.jar", "WordCount.jar");

运行程序后可以发现:job编号为 local,此种方式只是在本机运行
2015-04-12 07:31:05,637 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1373)) – Job job_local601569298_0001 completed successfully

替代方案:

//1:指定为hdfs文件系统
conf.set(“fs.defaultFS”, “hdfs://192.168.184.141:9000”);
该行代码可以用xml文件来替代:复制hadoop配置好的xml文件到clallPath下,也可以:
core-site.xml 和 hdfs-site.xml
原理:读取配置文件中的hdfs系统配置,替代代码


方式3:在eclipse中运行集群模式(仅限于linux)

基于方式2:加入了yarn配置:
mapred-site.xml
yarn-site.xml
把以上两个配置文件扔到classpath路径下
原理:程序在运行的时候,读取到配置信息:mapreduce.framework.name 运行在yarn上,所以这样就能把程序运行在集群中了,方便跟踪源码了解过程和调试
可以发现info中显示job 没有了local,成功了运行

2015-04-12 07:44:34,324 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1373)) - Job job_1428810427154_0003 completed successfully

运算完成之后,可以通过 cat 命令,到输出目录中查看结果
hadoop fs -cat /wordCount/output/part-r-00000

[hadoop@had01 ~]$ hadoop fs -cat /wordCount/output/part-r-00000 
dst     1 
param   2 
path    2 
src     1 
zhuqiang        3

mapreduce运行逻辑

这里写图片描述
入门逻辑:
一个节点:
由InputFormat组件从hdfs上面读取需要分析的文件,每次读取一行输送给mapper,mapper处理完之后,交给shuffle把同一个key进行组合,排序之后,交给reducer,reducer进行汇总,并输出,再由OutputFormat输出结果到指定目录

yarn工作机制及job提交流程

这里写图片描述

小程序练习

统计日志流量

数据示例:

1363157990043   13925057413 00-1F-64-E1-E6-9A:CMCC  120.196.100.55  t3.baidu.com    搜索引擎    69  63  11058   48243   200 
1363157988072   13760778710 00-FD-07-A4-7B-08:CMCC  120.196.100.82          2   2   120 120 200

统计每个手机号码的上下行和总流量

mapReduce 程序,由于比较简单,写到同一个类中的

/** 
 * 统计每个号码的上行下行流量汇总 
 *  
 * @author hadoop 
 * 
 */ 
public class FlowCount { 
 
    public static class FlowMapper extends 
            Mapper<LongWritable, Text, Text, FlowInfo> { 
        @Override 
        protected void map(LongWritable key, Text value, Context context) 
                throws IOException, InterruptedException { 
            // 每行数据示例:1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 
            // 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200 
            String[] split = StringUtils.split(value.toString(), "/t"); 
            Long up_flow = new Long(split[split.length - 3]); 
            Long down_flow = new Long(split[split.length - 2]); 
            context.write(new Text(split[1]), new FlowInfo(up_flow, down_flow)); 
        } 
    } 
 
    public static class FlowReducer extends 
            Reducer<Text, FlowInfo, Text, FlowInfo> { 
        @Override 
        protected void reduce(Text key, Iterable<FlowInfo> values, 
                Context context) throws IOException, InterruptedException { 
            Long up_flow = 0l; 
            Long down_flow = 0l; 
 
            for (FlowInfo ff : values) { 
                up_flow += ff.getUp_flow(); 
                down_flow += ff.getDown_flow(); 
            } 
            context.write(new Text(key), new FlowInfo(up_flow, down_flow)); 
        } 
    } 
 
    public static void main(String[] args) throws IOException, 
            ClassNotFoundException, InterruptedException { 
        // 配置信息 
        Configuration conf = new Configuration(); 
        conf.set("fs.defaultFS", "hdfs://192.168.184.141:9000"); 
        conf.set("mapreduce.job.jar", "FlowCount.jar"); 
        Job job = Job.getInstance(conf); 
        // 指定jar包 
        job.setJarByClass(FlowCount.class); 
        job.setMapperClass(FlowMapper.class); 
        job.setReducerClass(FlowReducer.class); 
        // 指定输入输出类型 
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(FlowInfo.class); 
        // 指定数据所在目录 
        FileInputFormat.setInputPaths(job, new Path("/wordCount/data/http/")); 
        // 制定结果输出目录 
        FileOutputFormat.setOutputPath(job, new Path("/wordCount/output/")); 
        job.waitForCompletion(true); 
    } 
}

由于输出的value需要多个业务参数:上行流量、下行流量,总流量,就需要自定义vaue

import org.apache.hadoop.io.WritableComparable; 
 
/** 
 * 流量信息,自定义value,需要注意: 
 * 1:必须留一个空参数构造,否则会抛出:init方法未找到 
 * 2:需要实现序列化和反序列化 
 *  
 * @author hadoop 
 * 
 */ 
public class FlowInfo implements WritableComparable<FlowInfo> { 
    private long up_flow; // 上行 
    private long down_flow; // 下行 
    private long sum_flow; // 总流量 
 
    public FlowInfo(long up_flow, long down_flow) { 
        super(); 
        this.up_flow = up_flow; 
        this.down_flow = down_flow; 
        this.sum_flow = up_flow + down_flow; 
    } 
 
    public FlowInfo() { 
        super(); 
    } 
 
    public long getUp_flow() { 
        return up_flow; 
    } 
 
    public void setUp_flow(long up_flow) { 
        this.up_flow = up_flow; 
    } 
 
    public long getDown_flow() { 
        return down_flow; 
    } 
 
    public void setDown_flow(long down_flow) { 
        this.down_flow = down_flow; 
    } 
 
    public long getSum_flow() { 
        return sum_flow; 
    } 
 
    public void setSum_flow(long sum_flow) { 
        this.sum_flow = sum_flow; 
    } 
 
    @Override 
    public String toString() { 
        //key,value 是默认/t ,所以这里分割需要注意下 
        return this.up_flow + "/t" + this.down_flow + "/t" + sum_flow; 
    } 
 
    // 序列化方法,把需要序列化的字段输出 
    @Override 
    public void write(DataOutput out) throws IOException { 
        out.writeLong(this.up_flow); 
        out.writeLong(this.down_flow); 
        out.writeLong(this.sum_flow); 
    } 
 
    // 反序列化 
    @Override 
    public void readFields(DataInput in) throws IOException { 
        this.up_flow = in.readLong(); 
        this.down_flow = in.readLong(); 
        this.sum_flow = in.readLong(); 
    } 
 
    @Override 
    public int compareTo(FlowInfo o) { 
        // TODO Auto-generated method stub 
        return 0; 
    } 
 
} 

注意:自定义key,value 如果不实现WritableComparable将会抛出以下异常,
Error: java.lang.NullPointerException
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:988)
原理:因为文件或许会很大,有策略当容量达到一定大小的时候就会把数据溢出到本地文件,所以需要实现hadoop的序列化接口
统计部分结果:

13480253104     3600 3600 7200 
13502468823     146700 2206980 2353680 
13560436666     35970 256350 292320 
13560439658     40680 117840 158520 
13602846565     38760 58200 96960 
13660577991     139200 13800 153000 
13719199419     4800 0 4800 
13726230503     24810 246810 271620 
13726238888     49620 493620 543240

按总流量降序排序

直接在上面程序的统计结果上,再排序。
排序之前就得弄明白,排序是在什么阶段完成的。看下图
mapReduce的一个核心就在shuffle了,从图上看出,排序是按key来进行排序的,所以从这里入手
这里写图片描述

思路:让把flowInfo设置为key,并覆写compareTo方法,来返回大小
在reducer中输出的时候把key和value再换回来
compareTo:(flowInfo的代码和上面程序的一致,只是compartTo代码变成了下面这样)

    @Override 
    public int compareTo(FlowInfo o) { 
        return this.sum_flow < o.getSum_flow()?1:-1;  //小于返回1,大于或则等于返回-1 降序 
    }

FlowCountSort

/** 
 * 在上一个程序产生的结果中排序 
 *  
 * @author hadoop 
 * 
 */ 
public class FlowCountSort { 
/** 
 * 上个文件产生的结果示例:需求:按总流量降序 
 *  13480253104     3600 3600 7200 
    13502468823     146700 2206980 2353680 
    13560436666     35970 256350 292320 
    13560439658     40680 117840 158520 
 * 
 */ 
    public static class FlowMapper extends 
            Mapper<LongWritable, Text, FlowInfo, Text> { 
        @Override 
        protected void map(LongWritable key, Text value, Context context) 
                throws IOException, InterruptedException { 
            String[] split = StringUtils.split(value.toString(), "/t"); 
            Long up_flow = new Long(split[1]); 
            Long down_flow = new Long(split[2]); 
            context.write(new FlowInfo(up_flow, down_flow),new Text(split[0])); 
        } 
    } 
 
    public static class FlowReducer extends 
            Reducer<FlowInfo,Text, Text, FlowInfo> { 
        @Override 
        protected void reduce(FlowInfo key, Iterable<Text> values,Context context) 
                throws IOException, InterruptedException { 
            //由于读取的源文件已经是汇总过的了,所以values中只会存在一个元素。 
            Text phone = values.iterator().next(); 
            context.write(phone, key); 
        } 
    } 
 
    public static void main(String[] args) throws IOException, 
            ClassNotFoundException, InterruptedException { 
        // 配置信息 
        Configuration conf = new Configuration(); 
        conf.set("fs.defaultFS", "hdfs://192.168.184.141:9000"); 
        conf.set("mapreduce.job.jar", "FlowCountSort.jar"); 
        Job job = Job.getInstance(conf); 
        // 指定jar包 
        job.setJarByClass(FlowCountSort.class); 
        job.setMapperClass(FlowMapper.class); 
        job.setReducerClass(FlowReducer.class); 
        //指定map输出类型 
        job.setMapOutputKeyClass(FlowInfo.class); 
        job.setMapOutputValueClass(Text.class); 
        // 指定reducer输出类型 
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(FlowInfo.class); 
        // 指定数据所在目录 
        FileInputFormat.setInputPaths(job, new Path("/wordCount/output/")); 
        // 制定结果输出目录 
        FileOutputFormat.setOutputPath(job, new Path("/wordCount/output2/")); 
        job.waitForCompletion(true); 
    } 
} 

分组统计

需求:把某一部分号码归为一类统计

从运行的进程上来看,(在运行mapreduce程序的时候,我在每台机器上进行jps查看进程,发现)
影响YarnChild(mapper)数量,和文件的数量有关,而默认YarnChild(reducer)只有一个,一个reducer那么汇总和输出的文件就只有一份

Partitioner编程

  1. Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。

  2. HashPartitioner是mapreduce的默认partitioner。计算方法是
    which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。

import java.util.HashMap; 
 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Partitioner; 
 
public class MyPartitioner extends Partitioner<Text, FlowInfo> { 
    private static HashMap<String, Integer> map = new HashMap<String, Integer>(); 
    static { 
        map.put("135", 0); 
        map.put("136", 1); 
        map.put("137", 2); 
        map.put("138", 3); 
    } 
 
    @Override 
    public int getPartition(Text key, FlowInfo value, int numPartitions) { 
        String str = key.toString().substring(0, 3); 
        if (map.containsKey(str)) { 
            return map.get(str); 
        } 
        return 4; // 没找到的分组交给第4组reducer处理 
    } 
 
} 

针对第一个程序,的main入口中的job设定Partitioner。

job.setPartitionerClass(MyPartitioner.class); 
job.setNumReduceTasks(5); //设置有5组reduceTasks处理,需要和Partitioner里面预期分组的服务数相同

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9633.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论