Hadoop——使用IDEA开发WordCount on Yarn详解编程语言

首先将之前配置好的mapred-site.xml和yarn-site.xml拷贝进resources文件夹,并在pom.xml中加入

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> 
        <dependency> 
            <groupId>org.apache.hadoop</groupId> 
            <artifactId>hadoop-client</artifactId> 
            <version>2.6.5</version> 
        </dependency> 

具体代码及解释如下:

package com.hadoop.mapreduce.wc; 
 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
 
import java.io.IOException; 
 
/** 
 * @author Song X 
 * @date 2020/02/29 
 */ 
public class MyWordCount {
    
 
 
    /** 
     * 用于解析配置文件 
     */ 
    private static Configuration configuration = null; 
 
    /** 
     * 用于提交计算任务给集群 
     */ 
    private static Job job = null; 
 
    public static void init() throws Exception {
    
        configuration = new Configuration(true); 
        job = Job.getInstance(configuration); 
        job.setJarByClass(MyWordCount.class); 
        //起一个job名字 
        job.setJobName("Test_WordCount"); 
 
        //指定输入和输出数据路径 
        Path inputPath = new Path("/data/wc/input"); 
        TextInputFormat.addInputPath(job, inputPath); 
 
        //写这句话时要保证这个目录不存在,不然会报错 
        //为防止我们事先不知道存不存在,可以加下面的代码,若存在则先删除,注意正式开发时,小心使用这个代码,防止误删数据!!! 
        //if(outputPath.getFileSystem(configuration).exists(outputPath)){
    
        //      outputPath.getFileSystem(configuration).delete(outputPath, true); 
        //} 
        Path outputPath = new Path("/data/wc/output"); 
        TextOutputFormat.setOutputPath(job, outputPath); 
 
    } 
 
    private static void compute() throws Exception {
    
        //设置Map计算类 
        job.setMapperClass(MyMapper.class); 
 
        //需要在这里告诉Reduce端传来的key-value都是什么类型,方便Reduce端做序列化和反序列化 
        job.setMapOutputKeyClass(Text.class); 
        job.setMapOutputValueClass(IntWritable.class); 
 
        //设置Reduce计算类 
        job.setReducerClass(MyReduce.class); 
 
        //提交任务,并等待完成 
        job.waitForCompletion(true); 
    } 
 
 
    public static void main(String[] args) throws Exception{
    
        init(); 
        compute(); 
    } 
 
} 
package com.hadoop.mapreduce.wc; 
 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 
 
import java.io.IOException; 
import java.util.StringTokenizer; 
 
/** 
 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
 * 不管在map还是reduce,都会牵扯到Java的序列化和反序列化 
 * Text和IntWritable就是Hadoop对String和int的序列化包装类型 
 * 如果是自己开发的的类型,必须实现序列化、反序列化接口 
 * 
 * 在map和reduce中,依照所学,排序是很重要的,而排序实际上就是比较,所以自己开发的类型也需要实现比较器接口,定义自己的比较规则 
 * 
 * @author Frank 
 * @date 2020/02/29 
 */ 
public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
    
 
    /** 
     * 比如这样的数据hello hadoop 1 
     *              hello hadoop 2 
     * 经过map完的记过应该是hello 1 
     *                     hello 1 
     *                     hadoop 1 
     *                     hadoop 1 
     *                     1 1 
     *                     2 1 
     * 所以你会发现,key永远在变,而value是不变的(正是因为value不变,所以定义为static final对性能是最优的) 
     */ 
 
 
    /** 
     * 这就是Map输出类型key-value的value 
     */ 
    private static final IntWritable one = new IntWritable(1); 
 
    /** 
     * 这就是输出类型key-value的key 
     */ 
    private Text word = new Text(); 
 
    /** 
     * hello hadoop 1 
     * hello hadoop 2 
     * @param key 是每一行字符串第一个字符面向源文件的偏移量,比如上面第一行的h偏移量是0,第二行的h偏移量是15(换行符也算,第一个h偏移量是0) 
     * @param value 字符串本身,比如hello hadoop 1。所以我们其实只是要处理value 
     * @param context 
     * @throws IOException 
     * @throws InterruptedException 
     */ 
    @Override 
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    
        //java的一个工具类,使用正则的方式对字符串进行空白切割,得到一个包含所有单词的迭代器 
        StringTokenizer itr = new StringTokenizer(value.toString()); 
 
        while(itr.hasMoreTokens()){
    
            //将单词赋给word 
            word.set(itr.nextToken()); 
            //将单词和1合并输出 
            context.write(word, one); 
        } 
    } 
 
} 
package com.hadoop.mapreduce.wc; 
 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 
 
import java.io.IOException; 
 
/** 
 * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
 * Reducer的输入类型很明显就是Map的输出类型,Reduce的输出类型由我们自己指定 
 * @author 
 * @date 2020/02/29 
 */ 
public class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    /** 
     * 回想ReduceTask,是相同的key为一组,一组数据调用一次reduce方法 
     * 所以reduce收到的数据应该长这样 
     *                      hello 1 
     *                      hello 1 
     *                      hello 1 
     *                        ... 
     * 
     */ 
 
    /** 
     * 最后key-value结果的value 
     */ 
    private IntWritable result = new IntWritable(); 
 
    /** 
     * 
     * @param key 
     * @param values 我们会得到一个value的迭代器 
     * @param context 
     * @throws IOException 
     * @throws InterruptedException 
     */ 
    @Override 
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
    
        //业务代码其实很简单,就是对同一个key做value的累加 
        int sum = 0; 
        for(IntWritable value : values){
    
            sum += value.get(); 
        } 
 
        result.set(sum); 
        context.write(key, result); 
    } 
} 

写好代码后点击package将程序打包成jar包i
在这里插入图片描述
然后会在目录中生成对应的jar包在这里插入图片描述
将其拷贝到节点上,使用hadoop jar com.hadoop.mapreduce.wc.MyWordCount进行运行
注意:第三个参数是程序的入口类,第四个参数和第五个参数分别是数据的input path和output path,我在程序中写死了,这里就不用传了

Hadoop入门系列到此就更新结束了

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/20591.html

(0)
上一篇 2021年7月19日 23:34
下一篇 2021年7月19日 23:35

相关推荐

发表回复

登录后才能评论