MapReduce原理
分布式运算思想
场景需求
有一个日志文件,记录了一些搜索关键词搜索的记录,xx.log
按照 搜索时间 搜索关键词 ….. 等等属性为规则的一行行排列,使用分布式运行大概有以下几个步骤:
- 1:任务资源的分发(jar文件,运算程序)
- 1.1:服务器运算资源对各个任务的分配
- 2:任务在各个节点上设置运行环境,启动执行
- 3:监视各个节点上任务的执行状态
- 4:如果有任务失败,还要设法重试
- 5:中间结果的调度、汇总
就以上几点来说,如果我们自己实现的话,随便一个实现起来都非常的困难,所以hadoop提供了解决方案:
1、1.1、2 — yarn 资源调度集群框架(hadoop原生)
3、4、5 — mapreduce分布式计算模型框架(hadoop原生) 其他的有:spark、storm
mapreduce 官方示例
提示:需要启动分布式中的yarn服务哦
wordcount(统计)
示例需求:在hdfis中某个目录下,有一些日志文件,需要统计该文件中单词出现的次数
我们使用 hadoop提供的示例jar包来运行分析:(首先得模拟一些单词日志文件上传到hdfs上)
参数说明:运行什么例子 需要分析的数据文件/文件夹 统计结果输出目录
hadoop jar /home/hadoop/app/hadoop-2.4.1/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar wordcount /wordCount/data/ /wordCount/output
运行完毕之后,使用fs的查看文本命令查看输出的目录:hadoop fs -cat /wordCount/output/part-r-00000
pi(计算圆周率)
示例需求:计算圆周率
hadoop jar /home/hadoop/app/hadoop-2.4.1/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar pi 10 10
MapReduce Helloworld
wordcount(统计)实现
MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。
这两个函数的形参是key、value对,表示函数的输入信息。
mapper
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
*
* @ClassName: WordCountMapper
* @Description:
* KEYIN, 输入的偏移量
* VALUEIN, 输入的(读取)的value(一行文本)
* KEYOUT, 输出的key
* VALUEOUT,输出的value
*
* @author zq
*
* @version V0.1
* @date 2015年4月12日 下午2:52:54
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
// 偏移量、读取到的文本、处理之后的每个单词、单词出现的次数(输出一次为1)
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" "); // 切分数据
for (String word : words) {
context.write(new Text(word), new LongWritable(1));
}
}
}
reducer
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
*
* @ClassName: WordCountReducer
* @Description:
* 参数依次是:
* mapper中的输出类型(前两个)
* reducer中的输出类型(后两个)
*
* @author zq
*
* @version V0.1
* @date 2015年4月12日 下午7:50:46
*/
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException,
InterruptedException {
//统计工作
long count = 0;
//遍历values,累加到计数器中
for (LongWritable v : values) {
count+=v.get();
}
//输出一个单词key及其总次数
context.write(new Text(key), new LongWritable(count));
}
}
driver运行入口配置信息
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 构造一个job对象来封装本mapreduce业务到所有信息
Job wcjob = Job.getInstance(conf);
// 指定本job工作用到的jar包位置
wcjob.setJarByClass(WordCountDriver.class);
// 指定本job用到的mapper类
wcjob.setMapperClass(WordCountMapper.class);
// 指定本job用到的reducer类
wcjob.setReducerClass(WordCountReducer.class);
// 指定mapper输出的kv类型
wcjob.setMapOutputKeyClass(Text.class);
wcjob.setMapOutputValueClass(LongWritable.class);
// 指定reducer输出到kv数据类型,(setOutputKeyClass
// 会对mapper和reducer都起作用,如果上面mapper不设置的话)
wcjob.setOutputKeyClass(org.apache.hadoop.io.Text.class);
wcjob.setOutputValueClass(LongWritable.class);
// 指定程序处理到输入数据所在的路径
FileInputFormat.setInputPaths(wcjob, new Path("/wordCount/data/"));
// 指定程序处理到输出结果所在路径
FileOutputFormat.setOutputPath(wcjob, new Path("/wordCount/output/"));
// 将该job通过yarn的客户端进行提交
wcjob.waitForCompletion(true);
}
}
那么问题来了,怎么运行该程序呢?
方式1:利用hadoop命令提交(集群运行模式)
- 1:首先把上面写好的程序打成jar包,上传到linux服务器中
注意:如果上传到hdfs中,运行hdfs中的jar的时候会提示:Not a valid JAR: /wordcount.jar - 2:使用hadoop -jar 运行
hadoop jar /wordCount/jar/WordCount.jar cn.mapperReduce.mr1.WordCountDriver
注:如果打jar包的时候打成的是runJar,则这里可不用指定入口类
方式2:直接在eclipse中运行driver main函数(local本地运行模式:方便调试)仅限linux下
直接运行上面的代码会抛出错误:
Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: file:/wordCount/data
**需要注意的点有2点:**
- 输入输出目录,要是hdfs目录
- 要指定jar包在本机上的位置(我们把jar包打到当前项目classPath下)
//1:指定为hdfs文件系统
conf.set("fs.defaultFS", "hdfs://192.168.184.141:9000");
//2:指定jar包位置
conf.set("mapreduce.job.jar", "WordCount.jar");
运行程序后可以发现:job编号为 local,此种方式只是在本机运行
2015-04-12 07:31:05,637 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1373)) – Job job_local601569298_0001 completed successfully
替代方案:
//1:指定为hdfs文件系统
conf.set(“fs.defaultFS”, “hdfs://192.168.184.141:9000”);
该行代码可以用xml文件来替代:复制hadoop配置好的xml文件到clallPath下,也可以:
core-site.xml 和 hdfs-site.xml
原理:读取配置文件中的hdfs系统配置,替代代码
方式3:在eclipse中运行集群模式(仅限于linux)
基于方式2:加入了yarn配置:
mapred-site.xml
yarn-site.xml
把以上两个配置文件扔到classpath路径下
原理:程序在运行的时候,读取到配置信息:mapreduce.framework.name 运行在yarn上,所以这样就能把程序运行在集群中了,方便跟踪源码了解过程和调试
可以发现info中显示job 没有了local,成功了运行
2015-04-12 07:44:34,324 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1373)) - Job job_1428810427154_0003 completed successfully
运算完成之后,可以通过 cat 命令,到输出目录中查看结果
hadoop fs -cat /wordCount/output/part-r-00000
[hadoop@had01 ~]$ hadoop fs -cat /wordCount/output/part-r-00000
dst 1
param 2
path 2
src 1
zhuqiang 3
mapreduce运行逻辑
入门逻辑:
一个节点:
由InputFormat组件从hdfs上面读取需要分析的文件,每次读取一行输送给mapper,mapper处理完之后,交给shuffle把同一个key进行组合,排序之后,交给reducer,reducer进行汇总,并输出,再由OutputFormat输出结果到指定目录
yarn工作机制及job提交流程
小程序练习
统计日志流量
数据示例:
1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
统计每个手机号码的上下行和总流量
mapReduce 程序,由于比较简单,写到同一个类中的
/**
* 统计每个号码的上行下行流量汇总
*
* @author hadoop
*
*/
public class FlowCount {
public static class FlowMapper extends
Mapper<LongWritable, Text, Text, FlowInfo> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 每行数据示例:1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY
// 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200
String[] split = StringUtils.split(value.toString(), "/t");
Long up_flow = new Long(split[split.length - 3]);
Long down_flow = new Long(split[split.length - 2]);
context.write(new Text(split[1]), new FlowInfo(up_flow, down_flow));
}
}
public static class FlowReducer extends
Reducer<Text, FlowInfo, Text, FlowInfo> {
@Override
protected void reduce(Text key, Iterable<FlowInfo> values,
Context context) throws IOException, InterruptedException {
Long up_flow = 0l;
Long down_flow = 0l;
for (FlowInfo ff : values) {
up_flow += ff.getUp_flow();
down_flow += ff.getDown_flow();
}
context.write(new Text(key), new FlowInfo(up_flow, down_flow));
}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
// 配置信息
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.184.141:9000");
conf.set("mapreduce.job.jar", "FlowCount.jar");
Job job = Job.getInstance(conf);
// 指定jar包
job.setJarByClass(FlowCount.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 指定输入输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowInfo.class);
// 指定数据所在目录
FileInputFormat.setInputPaths(job, new Path("/wordCount/data/http/"));
// 制定结果输出目录
FileOutputFormat.setOutputPath(job, new Path("/wordCount/output/"));
job.waitForCompletion(true);
}
}
由于输出的value需要多个业务参数:上行流量、下行流量,总流量,就需要自定义vaue
import org.apache.hadoop.io.WritableComparable;
/**
* 流量信息,自定义value,需要注意:
* 1:必须留一个空参数构造,否则会抛出:init方法未找到
* 2:需要实现序列化和反序列化
*
* @author hadoop
*
*/
public class FlowInfo implements WritableComparable<FlowInfo> {
private long up_flow; // 上行
private long down_flow; // 下行
private long sum_flow; // 总流量
public FlowInfo(long up_flow, long down_flow) {
super();
this.up_flow = up_flow;
this.down_flow = down_flow;
this.sum_flow = up_flow + down_flow;
}
public FlowInfo() {
super();
}
public long getUp_flow() {
return up_flow;
}
public void setUp_flow(long up_flow) {
this.up_flow = up_flow;
}
public long getDown_flow() {
return down_flow;
}
public void setDown_flow(long down_flow) {
this.down_flow = down_flow;
}
public long getSum_flow() {
return sum_flow;
}
public void setSum_flow(long sum_flow) {
this.sum_flow = sum_flow;
}
@Override
public String toString() {
//key,value 是默认/t ,所以这里分割需要注意下
return this.up_flow + "/t" + this.down_flow + "/t" + sum_flow;
}
// 序列化方法,把需要序列化的字段输出
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.up_flow);
out.writeLong(this.down_flow);
out.writeLong(this.sum_flow);
}
// 反序列化
@Override
public void readFields(DataInput in) throws IOException {
this.up_flow = in.readLong();
this.down_flow = in.readLong();
this.sum_flow = in.readLong();
}
@Override
public int compareTo(FlowInfo o) {
// TODO Auto-generated method stub
return 0;
}
}
注意:自定义key,value 如果不实现WritableComparable将会抛出以下异常,
Error: java.lang.NullPointerException
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:988)
原理:因为文件或许会很大,有策略当容量达到一定大小的时候就会把数据溢出到本地文件,所以需要实现hadoop的序列化接口
统计部分结果:
13480253104 3600 3600 7200
13502468823 146700 2206980 2353680
13560436666 35970 256350 292320
13560439658 40680 117840 158520
13602846565 38760 58200 96960
13660577991 139200 13800 153000
13719199419 4800 0 4800
13726230503 24810 246810 271620
13726238888 49620 493620 543240
按总流量降序排序
直接在上面程序的统计结果上,再排序。
排序之前就得弄明白,排序是在什么阶段完成的。看下图
mapReduce的一个核心就在shuffle了,从图上看出,排序是按key来进行排序的,所以从这里入手
思路:让把flowInfo设置为key,并覆写compareTo方法,来返回大小
在reducer中输出的时候把key和value再换回来
compareTo:(flowInfo的代码和上面程序的一致,只是compartTo代码变成了下面这样)
@Override
public int compareTo(FlowInfo o) {
return this.sum_flow < o.getSum_flow()?1:-1; //小于返回1,大于或则等于返回-1 降序
}
FlowCountSort
/**
* 在上一个程序产生的结果中排序
*
* @author hadoop
*
*/
public class FlowCountSort {
/**
* 上个文件产生的结果示例:需求:按总流量降序
* 13480253104 3600 3600 7200
13502468823 146700 2206980 2353680
13560436666 35970 256350 292320
13560439658 40680 117840 158520
*
*/
public static class FlowMapper extends
Mapper<LongWritable, Text, FlowInfo, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] split = StringUtils.split(value.toString(), "/t");
Long up_flow = new Long(split[1]);
Long down_flow = new Long(split[2]);
context.write(new FlowInfo(up_flow, down_flow),new Text(split[0]));
}
}
public static class FlowReducer extends
Reducer<FlowInfo,Text, Text, FlowInfo> {
@Override
protected void reduce(FlowInfo key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
//由于读取的源文件已经是汇总过的了,所以values中只会存在一个元素。
Text phone = values.iterator().next();
context.write(phone, key);
}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
// 配置信息
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.184.141:9000");
conf.set("mapreduce.job.jar", "FlowCountSort.jar");
Job job = Job.getInstance(conf);
// 指定jar包
job.setJarByClass(FlowCountSort.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//指定map输出类型
job.setMapOutputKeyClass(FlowInfo.class);
job.setMapOutputValueClass(Text.class);
// 指定reducer输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowInfo.class);
// 指定数据所在目录
FileInputFormat.setInputPaths(job, new Path("/wordCount/output/"));
// 制定结果输出目录
FileOutputFormat.setOutputPath(job, new Path("/wordCount/output2/"));
job.waitForCompletion(true);
}
}
分组统计
需求:把某一部分号码归为一类统计
从运行的进程上来看,(在运行mapreduce程序的时候,我在每台机器上进行jps查看进程,发现)
影响YarnChild(mapper)数量,和文件的数量有关,而默认YarnChild(reducer)只有一个,一个reducer那么汇总和输出的文件就只有一份
Partitioner编程
-
Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。
-
HashPartitioner是mapreduce的默认partitioner。计算方法是
which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。
import java.util.HashMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartitioner extends Partitioner<Text, FlowInfo> {
private static HashMap<String, Integer> map = new HashMap<String, Integer>();
static {
map.put("135", 0);
map.put("136", 1);
map.put("137", 2);
map.put("138", 3);
}
@Override
public int getPartition(Text key, FlowInfo value, int numPartitions) {
String str = key.toString().substring(0, 3);
if (map.containsKey(str)) {
return map.get(str);
}
return 4; // 没找到的分组交给第4组reducer处理
}
}
针对第一个程序,的main入口中的job设定Partitioner。
job.setPartitionerClass(MyPartitioner.class);
job.setNumReduceTasks(5); //设置有5组reduceTasks处理,需要和Partitioner里面预期分组的服务数相同
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9633.html