使用MapReduce实现温度排序详解大数据

温度排序代码,具体说明可以搜索其他博客

KeyPair.java

package temperaturesort; 
 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.WritableComparable; 
 
import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
 
public class KeyPair implements WritableComparable<KeyPair> { 
    private int hot; 
    private int year; 
 
    public int getYear() { 
        return year; 
    } 
 
    public void setYear(int year) { 
        this.year = year; 
    } 
 
    public int getHot() { 
        return hot; 
    } 
 
    public void setHot(int hot) { 
        this.hot = hot; 
    } 
 
    public int compareTo(KeyPair o) { 
        int result = this.year-o.getYear(); 
        if(result!=0){ 
            return result<0?-1:1; 
        } 
        return -( this.hot < o.getHot() ? -1 :(this.hot == o.getHot()?0:1)); 
    } 
 
    public void write(DataOutput dataOutput) throws IOException { 
        dataOutput.writeInt(year); 
        dataOutput.writeInt(hot); 
    } 
 
    public void readFields(DataInput dataInput) throws IOException { 
        this.year=dataInput.readInt(); 
        this.hot=dataInput.readInt(); 
    } 
 
    @Override 
    public String toString() { 
        return year+"/t"+hot; 
    } 
 
    @Override 
    public int hashCode() { 
        return new Integer(year+hot).hashCode(); 
    } 
}

Sort.java:

package temperaturesort; 
 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.WritableComparator; 
 
public class Sort extends WritableComparator { 
    public Sort(){ 
         super(KeyPair.class,true); 
    } 
 
    @Override 
    public int compare(WritableComparable a, WritableComparable b) { 
        KeyPair key1 = (KeyPair)a; 
        KeyPair key2 = (KeyPair)b; 
        int result = key1.getYear()-key2.getYear(); 
        if(result!=0){ 
            return result<0?-1:1; 
        } 
        return key1.getHot()< key2.getHot() ? 1 :(key1.getHot() == key2.getHot()?0:-1); 
    } 
}

Partition.java:

package temperaturesort; 
 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Partitioner; 
 
public class Partition extends Partitioner<KeyPair,Text>{ 
    @Override 
    public int getPartition(KeyPair keyPair, Text text, int num) { 
        return keyPair.getYear()*127 % num; 
    } 
}

Group.java:

package temperaturesort; 
 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.WritableComparator; 
 
public class Group extends WritableComparator { 
    public Group(){ 
        super(KeyPair.class,true); 
    } 
 
    @Override 
    public int compare(WritableComparable a, WritableComparable b) { 
        KeyPair key1 = (KeyPair)a; 
        KeyPair key2 = (KeyPair)b; 
        return key1.getYear() < key2.getYear() ? -1 : (key1.getYear()==key2.getYear()?0:1); 
    } 
}

RunJob.java:

package temperaturesort; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
 
import java.io.IOException; 
import java.text.ParseException; 
import java.text.SimpleDateFormat; 
import java.util.Calendar; 
import java.util.Date; 
 
 
public class RunJob { 
    public static class TempSortMapper extends Mapper<Object,Text,KeyPair,Text>{ 
        static SimpleDateFormat simpleDateFormat =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
        @Override 
        protected void map(Object key, Text value, Context context) 
                throws IOException, InterruptedException { 
            String line = value.toString(); 
            String[] strArr=line.split("/t"); 
            if(strArr.length==2){ 
                try { 
                    Date date = simpleDateFormat.parse(strArr[0]); 
                    Calendar calendar = Calendar.getInstance(); 
                    calendar.setTime(date); 
                    int year = calendar.get(1); 
                    int hot = Integer.parseInt(strArr[1].substring(0,strArr[1].indexOf("C"))); 
                    KeyPair keyPair =new KeyPair(); 
                    keyPair.setHot(hot); 
                    keyPair.setYear(year); 
                    /*System.out.println("-------------------------------------------------------------------"); 
                    System.out.println(keyPair);*/ 
                    context.write(keyPair,value); 
                } catch (ParseException e) { 
                    e.printStackTrace(); 
                } 
            } 
 
        } 
    } 
 
    public static class TempSortReducer extends Reducer<KeyPair,Text,KeyPair,Text>{ 
        @Override 
        protected void reduce(KeyPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 
            for(Text text:values) 
                context.write(key,text); 
        } 
    } 
 
    public static void main(String[] args) throws Exception { 
        //System.setProperty("hadoop.home.dir","E://softs//majorSoft//hadoop-2.7.5"); 
        Configuration conf = new Configuration(); 
        conf.set("mapreduce.app-submission.cross-platform", "true"); 
        Path fileInput = new Path("hdfs://mycluster/testFile/hot.txt"); 
        Path fileOutput = new Path("hdfs://mycluster/output/hot"); 
 
        Job job =Job.getInstance(conf ,"temperatureSort"); 
        job.setJar("E://bigData//hadoopDemo//out//artifacts//wordCount_jar//hadoopDemo.jar"); 
        job.setJarByClass(RunJob.class); 
        job.setMapperClass(TempSortMapper.class); 
        job.setReducerClass(TempSortReducer.class); 
        job.setMapOutputKeyClass(KeyPair.class); 
        job.setMapOutputValueClass(Text.class); 
 
        job.setNumReduceTasks(3); 
        job.setSortComparatorClass(Sort.class); 
        job.setPartitionerClass(Partition.class); 
        job.setGroupingComparatorClass(Group.class); 
 
        FileInputFormat.addInputPath(job,fileInput); 
        FileOutputFormat.setOutputPath(job,fileOutput); 
        System.exit(job.waitForCompletion(true)?0:1); 
    } 
}

其中自定义的sort和parititon是在mapTask任务之后使用的,而Group是在reduce任务使用的。

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/9430.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论