Hadoop——使用IDEA开发WordCount on Yarn详解编程语言

首先将之前配置好的mapred-site.xml和yarn-site.xml拷贝进resources文件夹,并在pom.xml中加入

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> 
        <dependency> 
            <groupId>org.apache.hadoop</groupId> 
            <artifactId>hadoop-client</artifactId> 
            <version>2.6.5</version> 
        </dependency> 

具体代码及解释如下:

package com.hadoop.mapreduce.wc; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import java.io.IOException; 
/** 
* @author Song X 
* @date 2020/02/29 
*/ 
public class MyWordCount {
 
/** 
* 用于解析配置文件 
*/ 
private static Configuration configuration = null; 
/** 
* 用于提交计算任务给集群 
*/ 
private static Job job = null; 
public static void init() throws Exception {
 
configuration = new Configuration(true); 
job = Job.getInstance(configuration); 
job.setJarByClass(MyWordCount.class); 
//起一个job名字 
job.setJobName("Test_WordCount"); 
//指定输入和输出数据路径 
Path inputPath = new Path("/data/wc/input"); 
TextInputFormat.addInputPath(job, inputPath); 
//写这句话时要保证这个目录不存在,不然会报错 
//为防止我们事先不知道存不存在,可以加下面的代码,若存在则先删除,注意正式开发时,小心使用这个代码,防止误删数据!!! 
//if(outputPath.getFileSystem(configuration).exists(outputPath)){
 
//      outputPath.getFileSystem(configuration).delete(outputPath, true); 
//} 
Path outputPath = new Path("/data/wc/output"); 
TextOutputFormat.setOutputPath(job, outputPath); 
} 
private static void compute() throws Exception {
 
//设置Map计算类 
job.setMapperClass(MyMapper.class); 
//需要在这里告诉Reduce端传来的key-value都是什么类型,方便Reduce端做序列化和反序列化 
job.setMapOutputKeyClass(Text.class); 
job.setMapOutputValueClass(IntWritable.class); 
//设置Reduce计算类 
job.setReducerClass(MyReduce.class); 
//提交任务,并等待完成 
job.waitForCompletion(true); 
} 
public static void main(String[] args) throws Exception{
 
init(); 
compute(); 
} 
} 
package com.hadoop.mapreduce.wc; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 
import java.io.IOException; 
import java.util.StringTokenizer; 
/** 
* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
* 不管在map还是reduce,都会牵扯到Java的序列化和反序列化 
* Text和IntWritable就是Hadoop对String和int的序列化包装类型 
* 如果是自己开发的的类型,必须实现序列化、反序列化接口 
* 
* 在map和reduce中,依照所学,排序是很重要的,而排序实际上就是比较,所以自己开发的类型也需要实现比较器接口,定义自己的比较规则 
* 
* @author Frank 
* @date 2020/02/29 
*/ 
public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
 
/** 
* 比如这样的数据hello hadoop 1 
*              hello hadoop 2 
* 经过map完的记过应该是hello 1 
*                     hello 1 
*                     hadoop 1 
*                     hadoop 1 
*                     1 1 
*                     2 1 
* 所以你会发现,key永远在变,而value是不变的(正是因为value不变,所以定义为static final对性能是最优的) 
*/ 
/** 
* 这就是Map输出类型key-value的value 
*/ 
private static final IntWritable one = new IntWritable(1); 
/** 
* 这就是输出类型key-value的key 
*/ 
private Text word = new Text(); 
/** 
* hello hadoop 1 
* hello hadoop 2 
* @param key 是每一行字符串第一个字符面向源文件的偏移量,比如上面第一行的h偏移量是0,第二行的h偏移量是15(换行符也算,第一个h偏移量是0) 
* @param value 字符串本身,比如hello hadoop 1。所以我们其实只是要处理value 
* @param context 
* @throws IOException 
* @throws InterruptedException 
*/ 
@Override 
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 
//java的一个工具类,使用正则的方式对字符串进行空白切割,得到一个包含所有单词的迭代器 
StringTokenizer itr = new StringTokenizer(value.toString()); 
while(itr.hasMoreTokens()){
 
//将单词赋给word 
word.set(itr.nextToken()); 
//将单词和1合并输出 
context.write(word, one); 
} 
} 
} 
package com.hadoop.mapreduce.wc; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 
import java.io.IOException; 
/** 
* Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
* Reducer的输入类型很明显就是Map的输出类型,Reduce的输出类型由我们自己指定 
* @author 
* @date 2020/02/29 
*/ 
public class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
 
/** 
* 回想ReduceTask,是相同的key为一组,一组数据调用一次reduce方法 
* 所以reduce收到的数据应该长这样 
*                      hello 1 
*                      hello 1 
*                      hello 1 
*                        ... 
* 
*/ 
/** 
* 最后key-value结果的value 
*/ 
private IntWritable result = new IntWritable(); 
/** 
* 
* @param key 
* @param values 我们会得到一个value的迭代器 
* @param context 
* @throws IOException 
* @throws InterruptedException 
*/ 
@Override 
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
 
//业务代码其实很简单,就是对同一个key做value的累加 
int sum = 0; 
for(IntWritable value : values){
 
sum += value.get(); 
} 
result.set(sum); 
context.write(key, result); 
} 
} 

写好代码后点击package将程序打包成jar包i
在这里插入图片描述
然后会在目录中生成对应的jar包在这里插入图片描述
将其拷贝到节点上,使用hadoop jar com.hadoop.mapreduce.wc.MyWordCount进行运行
注意:第三个参数是程序的入口类,第四个参数和第五个参数分别是数据的input path和output path,我在程序中写死了,这里就不用传了

Hadoop入门系列到此就更新结束了

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/20591.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论