HBase学习之路 (五)MapReduce操作Hbase详解大数据

MapReduce从HDFS读取数据存储到HBase中

现有HDFS中有一个student.txt文件,格式如下

95002,刘晨,女,19,IS 
95017,王风娟,女,18,IS 
95018,王一,女,19,IS 
95013,冯伟,男,21,CS 
95014,王小丽,女,19,CS 
95019,邢小丽,女,19,IS 
95020,赵钱,男,21,IS 
95003,王敏,女,22,MA 
95004,张立,男,19,IS 
95012,孙花,女,20,CS 
95010,孔小涛,男,19,CS 
95005,刘刚,男,18,MA 
95006,孙庆,男,23,CS 
95007,易思玲,女,19,MA 
95008,李娜,女,18,CS 
95021,周二,男,17,MA 
95022,郑明,男,20,MA 
95001,李勇,男,20,CS 
95011,包小柏,男,18,MA 
95009,梦圆圆,女,18,MA 
95015,王君,男,18,MA

将HDFS上的这个文件里面的数据写入到HBase数据块中

MapReduce实现代码如下

import java.io.IOException; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.client.Put; 
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 
import org.apache.hadoop.hbase.mapreduce.TableReducer; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
 
public class ReadHDFSDataToHbaseMR extends Configured implements Tool{ 
 
    public static void main(String[] args) throws Exception { 
         
        int run = ToolRunner.run(new ReadHDFSDataToHbaseMR(), args); 
        System.exit(run); 
    } 
 
    @Override 
    public int run(String[] arg0) throws Exception { 
 
        Configuration conf = HBaseConfiguration.create(); 
        conf.set("fs.defaultFS", "hdfs://myha01/"); 
        conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181"); 
        System.setProperty("HADOOP_USER_NAME", "hadoop"); 
        FileSystem fs = FileSystem.get(conf); 
//        conf.addResource("config/core-site.xml"); 
//        conf.addResource("config/hdfs-site.xml"); 
         
        Job job = Job.getInstance(conf); 
         
        job.setJarByClass(ReadHDFSDataToHbaseMR.class); 
         
        job.setMapperClass(HDFSToHbaseMapper.class); 
        job.setMapOutputKeyClass(Text.class); 
        job.setMapOutputValueClass(NullWritable.class); 
 
        TableMapReduceUtil.initTableReducerJob("student", HDFSToHbaseReducer.class, job,null,null,null,null,false); 
        job.setOutputKeyClass(NullWritable.class); 
        job.setOutputValueClass(Put.class); 
         
        Path inputPath = new Path("/student/input/"); 
        Path outputPath = new Path("/student/output/"); 
         
        if(fs.exists(outputPath)) { 
            fs.delete(outputPath,true); 
        } 
         
        FileInputFormat.addInputPath(job, inputPath); 
        FileOutputFormat.setOutputPath(job, outputPath); 
         
        boolean isDone = job.waitForCompletion(true); 
         
        return isDone ? 0 : 1; 
    } 
     
     
    public static class HDFSToHbaseMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ 
         
        @Override 
        protected void map(LongWritable key, Text value, Context context) 
                throws IOException, InterruptedException {     
            context.write(value, NullWritable.get()); 
        } 
         
    } 
     
    /** 
     * 95015,王君,男,18,MA 
     * */ 
    public static class HDFSToHbaseReducer extends TableReducer<Text, NullWritable, NullWritable>{ 
         
        @Override 
        protected void reduce(Text key, Iterable<NullWritable> values,Context context) 
                throws IOException, InterruptedException { 
             
            String[] split = key.toString().split(","); 
             
            Put put = new Put(split[0].getBytes()); 
             
            put.addColumn("info".getBytes(), "name".getBytes(), split[1].getBytes()); 
            put.addColumn("info".getBytes(), "sex".getBytes(), split[2].getBytes()); 
            put.addColumn("info".getBytes(), "age".getBytes(), split[3].getBytes()); 
            put.addColumn("info".getBytes(), "department".getBytes(), split[4].getBytes()); 
             
            context.write(NullWritable.get(), put); 
         
        } 
         
    } 
     
}

MapReduce从HBase读取数据计算平均年龄并存储到HDFS中

import java.io.IOException; 
import java.util.List; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.hbase.Cell; 
import org.apache.hadoop.hbase.CellUtil; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.client.Result; 
import org.apache.hadoop.hbase.client.Scan; 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 
import org.apache.hadoop.hbase.mapreduce.TableMapper; 
import org.apache.hadoop.hbase.util.Bytes; 
import org.apache.hadoop.io.DoubleWritable; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
 
 
 
public class ReadHbaseDataToHDFS extends Configured implements Tool{ 
 
    public static void main(String[] args) throws Exception { 
         
        int run = ToolRunner.run(new ReadHbaseDataToHDFS(), args); 
        System.exit(run); 
         
    } 
 
    @Override 
    public int run(String[] arg0) throws Exception { 
 
        Configuration conf = HBaseConfiguration.create(); 
        conf.set("fs.defaultFS", "hdfs://myha01/"); 
        conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181"); 
        System.setProperty("HADOOP_USER_NAME", "hadoop"); 
        FileSystem fs = FileSystem.get(conf); 
//        conf.addResource("config/core-site.xml"); 
//        conf.addResource("config/hdfs-site.xml"); 
         
        Job job = Job.getInstance(conf); 
         
        job.setJarByClass(ReadHbaseDataToHDFS.class); 
         
         
        // 取对业务有用的数据 info,age 
        Scan scan = new Scan(); 
        scan.addColumn("info".getBytes(), "age".getBytes()); 
         
        TableMapReduceUtil.initTableMapperJob( 
                "student".getBytes(), // 指定表名 
                scan, // 指定扫描数据的条件 
                HbaseToHDFSMapper.class, // 指定mapper class 
                Text.class,     // outputKeyClass mapper阶段的输出的key的类型 
                IntWritable.class, // outputValueClass mapper阶段的输出的value的类型 
                job, // job对象 
                false 
                ); 
     
 
        job.setReducerClass(HbaseToHDFSReducer.class); 
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(DoubleWritable.class); 
         
        Path outputPath = new Path("/student/avg/"); 
         
        if(fs.exists(outputPath)) { 
            fs.delete(outputPath,true); 
        } 
         
        FileOutputFormat.setOutputPath(job, outputPath); 
         
        boolean isDone = job.waitForCompletion(true); 
         
        return isDone ? 0 : 1; 
    } 
     
    public static class HbaseToHDFSMapper extends TableMapper<Text, IntWritable>{ 
         
        Text outKey = new Text("age"); 
        IntWritable outValue = new IntWritable(); 
        // key是hbase中的行键 
        // value是hbase中的所行键的所有数据 
        @Override 
        protected void map(ImmutableBytesWritable key, Result value,Context context) 
                throws IOException, InterruptedException { 
             
            boolean isContainsColumn = value.containsColumn("info".getBytes(), "age".getBytes()); 
         
            if(isContainsColumn) { 
                 
                List<Cell> listCells = value.getColumnCells("info".getBytes(), "age".getBytes()); 
                System.out.println("listCells:/t"+listCells); 
                Cell cell = listCells.get(0); 
                System.out.println("cells:/t"+cell); 
                 
                byte[] cloneValue = CellUtil.cloneValue(cell); 
                String ageValue = Bytes.toString(cloneValue); 
                outValue.set(Integer.parseInt(ageValue)); 
                 
                context.write(outKey,outValue); 
                 
            } 
             
        } 
         
    } 
     
    public static class HbaseToHDFSReducer extends Reducer<Text, IntWritable, Text, DoubleWritable>{ 
         
        DoubleWritable outValue = new DoubleWritable(); 
         
        @Override 
        protected void reduce(Text key, Iterable<IntWritable> values,Context context) 
                throws IOException, InterruptedException { 
             
            int count = 0; 
            int sum = 0; 
            for(IntWritable value : values) { 
                count++; 
                sum += value.get(); 
            } 
             
            double avgAge = sum * 1.0 / count; 
            outValue.set(avgAge); 
            context.write(key, outValue); 
        } 
         
    } 
     
}

 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9011.html

(0)
上一篇 2021年7月19日 09:04
下一篇 2021年7月19日 09:04

相关推荐

发表回复

登录后才能评论