接下来通过一个实际的案例,介绍在MR编程中的,partition、sort、combiner。
流量统计项目案例
数据样本:
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4
2052.flash3-http.qq.com 综合门户 15 12 1938 2910 200
字段介绍:
需求:
1、 统计每一个用户(手机号)所耗费的总上行流量、总下行流量,总流量
2、 得出上题结果的基础之上再加一个需求:将统计结果按照总流量倒序排序
3、 将流量汇总统计结果按照手机归属地不同省份输出到不同文件中
需求一:统计每一个用户(手机号)所耗费的总上行流量、总下行流量,总流量
通过需求分析 ,我们可以知道这里可以使用combiner这个优化组件,它的作用是在 maptask之后给 maptask 的结果进行局部汇总,以减轻 reducetask 的计算负载,减少网络传输。
使用方式:Combiner 和 Reducer 一样,编写一个类,然后继承 Reducer,reduce 方法中写具体的 Combiner逻辑,然后在 job 中设置 Combiner 组件:job.setCombinerClass(FlowSumCombine.class)
代码实现:
public class FlowSum {
//job
public static void main(String[] args) {
Configuration conf = new Configuration(true);
conf.set("fs.defaultFS", "hdfs://zzy:9000");
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
System.setProperty("HADOOP_USER_NAME", "hadoop");
try {
Job job = Job.getInstance(conf);
job.setJobName("FlowSum");
//设置任务类
job.setJarByClass(FlowSum.class);
//设置Mapper Reducer Combine
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setCombinerClass(FlowSumCombine.class);
//设置map 和reduce 的输入输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 指定该 mapreduce 程序数据的输入和输出路径
Path input=new Path("/data/input");
Path output =new Path("/data/output");
//一定要保证output不存在
if(output.getFileSystem(conf).exists(output)){
output.getFileSystem(conf).delete(output,true); //递归删除
}
FileInputFormat.addInputPath(job,input);
FileOutputFormat.setOutputPath(job,output);
// 最后提交任务
boolean success = job.waitForCompletion(true);
System.exit(success?0:-1);
} catch (Exception e) {
e.printStackTrace();
}
}
//Mapper
private class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
//统计每一个用户(手机号)所耗费的总上行流量、总下行流量,总流量
//1363157984040
// 13602846565
// 5C-0E-8B-8B-B6-00:CMCC
// 120.197.40.4
//2052.flash3-http.qq.com
// 综合门户
// 15
// 12
// 1938 上行流量
// 2910 下行流量
// 200
Text mk=new Text();
Text mv=new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split("//s+");
String phone = fields[0];
String upFlow = fields[8];
String downFlow = fields[9];
mk.set(phone);
mv.set(upFlow+"_"+downFlow);
context.write(mk,mv);
}
}
//Combiner
private class FlowSumCombine extends Reducer<Text, Text,Text, Text> {
Text rv=new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int upFlowSum=0;
int downFlowSum=0;
int upFlow = 0;
int downFlow =0;
for(Text value:values){
String fields[]=value.toString().split("_");
upFlow=Integer.parseInt(fields[0]);
downFlow=Integer.parseInt(fields[1]);
upFlowSum+=upFlow;
downFlowSum+=downFlow;
}
rv.set(upFlowSum+"_"+downFlowSum);
context.write(key,rv);
}
}
//Reducer
private class MyReducer extends Reducer<Text, Text,Text, Text> {
Text rv=new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int upFlowSum=0;
int downFlowSum=0;
int upFlow = 0;
int downFlow =0;
for(Text value:values){
String fields[]=value.toString().split("_");
upFlow=Integer.parseInt(fields[0]);
downFlow=Integer.parseInt(fields[1]);
upFlowSum+=upFlow;
downFlowSum+=downFlow;
}
rv.set(upFlowSum+"//t"+downFlowSum);
context.write(key,rv);
}
}
}
使用 Combiner 注意事项
- Combiner 的输出 kv 类型应该跟 Reducer 的输入 kv 类型对应起来
- Combiner 的输入 kv 类型应该跟 Mapper 的输出 kv 类型对应起来
- Combiner 的使用要非常谨慎,因为 Combiner 在MapReduce 过程中可能调用也可能不调用,可能调一次也可能调多次
- Combiner 使用的原则是:有或没有都不能影响业务逻辑,都不能影响最终结果
需求二:得出上题结果的基础之上再加一个需求:将统计结果按照总流量倒序排序
分析:如果是在得出上行流量和下行流量之后,实现倒叙排序呢,之前在java中,如果想让对象按照自定义的规则排序,那么就需要自定义对象并且实现它的比较器。MR也可以,在MR运行过程中,如果有Reducer阶段的话,那么是一定会排序的,根据对象的比较器,进行排序,将排序结果相同的key分到一个reduceTask中。
实现:这里我们可以使用hadoop自定义WritableComparable来自定义对象,并且实现它的比较器。
代码实现(注意:这里的是对需求一实现之后的结果进行处理的):
自定义对象,实现比较器:
public class FlowBean implements WritableComparable<FlowBean> {
private String phone;
private long upFlow;
private long downFlow;
private long sumFlow;
// 序列化框架在反序列化操作创建对象实例时会调用无参构造
public FlowBean(){
}
public void set(String phone, long upFlow, long downFlow){
this.phone=phone;
this.upFlow=upFlow;
this.downFlow=downFlow;
this.sumFlow=upFlow+downFlow;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
// 序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.phone);
out.writeLong(this.upFlow);
out.writeLong(this.downFlow);
out.writeLong(this.sumFlow);
}
/*
反序列化方法,这里注意,上面的write中是什么顺写,这里就要什么顺序读取
要保证字段数据类型一一对应
*/
@Override
public void readFields(DataInput in) throws IOException {
this.phone=in.readUTF();
this.upFlow=in.readLong();
this.downFlow=in.readLong();
this.sumFlow=in.readLong();
}
//这里就是要实现的比较方法
// 0 表示相等, 1表示大于 负数表示小于
@Override
public int compareTo(FlowBean o) {
//倒叙输出的话就是,参数的属性-类中的属性
return (int)(o.sumFlow-this.sumFlow);
}
}
MR程序:
public class FlowSumSort {
//job
public static void main(String[] args) {
Configuration conf=new Configuration(true);
conf.set("fs.defaultFS","hdfs://zzy:9000");
conf.set("fs.defaultFS", "hdfs://zzy:9000");
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
System.setProperty("HADOOP_USER_NAME", "hadoop");
try {
Job job= Job.getInstance(conf);
job.setJarByClass(FlowSumSort.class);
job.setJobName("FlowSumSort");
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
job.setOutputKeyClass(FlowBean.class);
job.setOutputValueClass(NullWritable.class);
// 指定该 mapreduce 程序数据的输入和输出路径
Path input=new Path("//data/output");
Path output =new Path("/data/output1");
//一定要保证output不存在
if(output.getFileSystem(conf).exists(output)){
output.getFileSystem(conf).delete(output,true); //递归删除
}
FileInputFormat.addInputPath(job,input);
FileOutputFormat.setOutputPath(job,output);
boolean success=job.waitForCompletion(true);
System.exit(success?0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
//Mapper
private class MyMapper extends Mapper<LongWritable,Text,FlowBean,NullWritable> {
FlowBean bean=new FlowBean();
NullWritable mv=NullWritable.get();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split("//s+");
String phone=fields[0];
long upflow= Long.parseLong(fields[1]);
long downflow= Long.parseLong(fields[2]);
bean.set(phone,upflow,downflow);
context.write(bean,mv);
}
}
//Reducer
private class MyReducer extends Reducer<FlowBean,NullWritable,FlowBean,NullWritable>{
@Override
protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
for(NullWritable value:values){
context.write(key,value);
}
}
}
}
注意:虽然这里reduce阶段只是做了一个输出,没有任何业务操作,但是不能不使用reduce,因为MR的排序就是在MapTask运行完成之后,向reduceTask端输出数据的时候,才会进行排序,如果没有Reduce阶段就不会有排序。
需求三:根据归属地输出流量统计数据结果到不同文件,以便于在查询统计结果时可以定位到省级范围进行
分析:默认的在运行MR程序的时候,只运行一个ReducerTask,默认额的一个ReducerTask会有一个输出文件,那么只要自定义分区规则,并且设置好ReducerTask的个数,就可以完成以上需求,默认的MR的分区规则是,key的hashcode%分区数。
代码实现:
自定义分区:
//这里的两个泛型,是Mpapper端输出的key和value的类型
private static class MyPartitioner extends Partitioner<Text,FlowBean> {
//自定义的分区规则
private static HashMap<String, Integer> provincMap = new HashMap<String, Integer>();
static {
provincMap.put("138", 0);
provincMap.put("139", 1);
provincMap.put("136", 2);
provincMap.put("137", 3);
provincMap.put("135", 4);
}
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
//千万注意,这里的返回值,一定不能大于numPartitions,否则会报错
Integer code = provincMap.get(text.toString().substring(0, 3));
if(code!=null){
return code;
}
return 5;
}
}
job:
//指定分区规则,和分区个数
job.setPartitionerClass(MyPartitioner.class);
job.setNumReduceTasks(5);
注意:
- 在使用分区时,一定注意,分区的返回值一定不能大于设置的reduceTask个数
- 虽然设置多个ReduceTask可以增加并行度,但是也不需要设置太多,如果某个ReduceTask中没有数据,那么这个ReduceTask就是空跑,浪费资源
- 尽量的在设置分区时是从0开始的连续的整数
原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/197195.html