MapReduce操作HBase详解大数据

运行HBase时常会遇到个错误,我就有这样的经历。 

ERROR: org.apache.hadoop.hbase.MasterNotRunningException: Retried 7 times

检查日志:org.apache.hadoop.ipc.RPC$VersionMismatch: Protocol org.apache.hadoop.hdfs.protocol.ClientProtocol version mismatch. (client = 42, server = 41)

如果是这个错误,说明RPC协议不一致所造成的,解决方法:将hbase/lib目录下的hadoop-core的jar文件删除,将hadoop目录下的hadoop-0.20.2-core.jar拷贝到hbase/lib下面,然后重新启动hbase即可。第二种错误是:没有启动hadoop,先启用hadoop,再启用hbase。

在Eclipse开发中,需要加入hadoop所有的jar包以及HBase二个jar包(hbase,zooKooper)。

HBase基础可见帖子:http://www.cnblogs.com/liqizhou/archive/2012/05/14/2499112.html

  1. 建表,通过HBaseAdmin类中的create静态方法来创建表。
  2. HTable类是操作表,例如,静态方法put可以插入数据,该类初始化时可以传递一个行键,静态方法getScanner()可以获得某一列上的所有数据,返回Result类,Result类中有个静态方法getFamilyMap()可以获得以列名为key,值为value,这刚好与hadoop中map结果是一样的。
  3. 复制代码
    package test; 
    import java.io.IOException; 
    import java.util.Map; 
    import org.apache.hadoop.conf.Configuration; 
    import org.apache.hadoop.hbase.HBaseConfiguration; 
    import org.apache.hadoop.hbase.HColumnDescriptor; 
    import org.apache.hadoop.hbase.HTableDescriptor; 
    import org.apache.hadoop.hbase.client.HBaseAdmin; 
    import org.apache.hadoop.hbase.client.HTable; 
    import org.apache.hadoop.hbase.client.Put; 
    import org.apache.hadoop.hbase.client.Result; 
     
    public class Htable { 
     
        /** 
         * @param args 
         */ 
        public static void main(String[] args) throws IOException { 
            // TODO Auto-generated method stub 
            Configuration hbaseConf = HBaseConfiguration.create(); 
            HBaseAdmin admin = new HBaseAdmin(hbaseConf); 
            HTableDescriptor htableDescriptor = new HTableDescriptor("table" 
                    .getBytes());  //set the name of table 
            htableDescriptor.addFamily(new HColumnDescriptor("fam1")); //set the name of column clusters 
            admin.createTable(htableDescriptor); //create a table  
            HTable table = new HTable(hbaseConf, "table"); //get instance of table. 
            for (int i = 0; i < 3; i++) {   //for is number of rows 
                Put putRow = new Put(("row" + i).getBytes()); //the ith row 
                putRow.add("fam1".getBytes(), "col1".getBytes(), "vaule1" 
                        .getBytes());  //set the name of column and value. 
                putRow.add("fam1".getBytes(), "col2".getBytes(), "vaule2" 
                        .getBytes()); 
                putRow.add("fam1".getBytes(), "col3".getBytes(), "vaule3" 
                        .getBytes()); 
                table.put(putRow); 
            } 
            for(Result result: table.getScanner("fam1".getBytes())){//get data of column clusters  
                for(Map.Entry<byte[], byte[]> entry : result.getFamilyMap("fam1".getBytes()).entrySet()){//get collection of result 
                    String column = new String(entry.getKey()); 
                    String value = new String(entry.getValue()); 
                    System.out.println(column+","+value); 
                } 
            } 
            admin.disableTable("table".getBytes()); //disable the table 
            admin.deleteTable("table".getBytes());  //drop the tbale 
        } 
    }
    复制代码

    以上代码不难看懂。

下面介绍一下,用mapreduce怎样操作HBase,主要对HBase中的数据进行读取。

现在有一些大的文件,需要存入HBase中,其思想是先把文件传到HDFS上,利用map阶段读取<key,value>对,可在reduce把这些键值对上传到HBase中。

复制代码
package test; 
 
import java.io.IOException; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 
 
public class MapperClass extends Mapper<LongWritable,Text,Text,Text>{ 
        public void map(LongWritable key,Text value,Context context)thorws IOException{ 
            String[] items = value.toString().split(" "); 
            String k = items[0]; 
            String v = items[1];          
            context.write(new Text(k), new Text(v)); 
    } 
 
}
复制代码

Reduce类,主要是将键值传到HBase表中

复制代码
package test; 
 
import java.io.IOException; 
import org.apache.hadoop.hbase.client.Put; 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 
import org.apache.hadoop.hbase.mapreduce.TableReducer; 
import org.apache.hadoop.io.Text; 
 
public class ReducerClass extends TableReducer<Text,Text,ImmutableBytesWritable>{ 
    public void reduce(Text key,Iterable<Text> values,Context context){ 
        String k = key.toString(); 
        StringBuffer str=null; 
        for(Text value: values){ 
            str.append(value.toString()); 
        } 
        String v = new String(str);  
        Put putrow = new Put(k.getBytes()); 
        putrow.add("fam1".getBytes(), "name".getBytes(), v.getBytes());      
    } 
}
复制代码

由上面可知ReducerClass继承TableReduce,在hadoop里面ReducerClass继承Reducer类。它的原型为:TableReducer<KeyIn,Values,KeyOut>可以看出,HBase里面是读出的Key类型是ImmutableBytesWritable。

Map,Reduce,以及Job的配置分离,比较清晰,mahout也是采用这种构架。

复制代码
package test; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.util.Tool; 
 
public class Driver extends Configured implements Tool{ 
 
    @Override 
    public static void run(String[] arg0) throws Exception { 
        // TODO Auto-generated method stub 
        Configuration conf = HBaseConfiguration.create(); 
        conf.set("hbase.zookeeper.quorum.", "localhost");   
         
        Job job = new Job(conf,"Hbase"); 
        job.setJarByClass(TxtHbase.class); 
         
        Path in = new Path(arg0[0]); 
         
        job.setInputFormatClass(TextInputFormat.class); 
        FileInputFormat.addInputPath(job, in); 
         
        job.setMapperClass(MapperClass.class); 
        job.setMapOutputKeyClass(Text.class); 
        job.setMapOutputValueClass(Text.class); 
         
        TableMapReduceUtil.initTableReducerJob("table", ReducerClass.class, job); 
         
       job.waitForCompletion(true); 
    } 
     
}
复制代码

Driver中job配置的时候没有设置 job.setReduceClass(); 而是用 TableMapReduceUtil.initTableReducerJob(“tab1”, THReducer.class, job); 来执行reduce类。

主函数

复制代码
package test; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.util.ToolRunner; 
 
public class TxtHbase { 
    public static void main(String [] args) throws Exception{

Driver.run(new Configuration(),new THDriver(),args);

}
}
复制代码

 

读取数据时比较简单,编写Mapper函数,读取<key,value>值就行了。

复制代码
package test; 
 
import java.io.IOException; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.client.Result; 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 
import org.apache.hadoop.hbase.mapred.TableMap; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reporter; 
 
public class MapperClass extends MapReduceBase implements 
        TableMap<Text, Text> { 
    static final String NAME = "GetDataFromHbaseTest"; 
    private Configuration conf; 
 
    public void map(ImmutableBytesWritable row, Result values, 
            OutputCollector<Text, Text> output, Reporter reporter) 
            throws IOException { 
        StringBuilder sb = new StringBuilder(); 
        for (Entry<byte[], byte[]> value : values.getFamilyMap( 
                "fam1".getBytes()).entrySet()) { 
            String cell = value.getValue().toString(); 
            if (cell != null) { 
                sb.append(new String(value.getKey())).append(new String(cell)); 
            } 
        } 
        output.collect(new Text(row.get()), new Text(sb.toString())); 
    }
复制代码

要实现这个方法 initTableMapJob(String table, String columns, Class<? extends TableMap> mapper, Class<? extends org.apache.hadoop.io.WritableComparable> outputKeyClass, Class<? extends org.apache.hadoop.io.Writable> outputValueClass, org.apache.hadoop.mapred.JobConf job, boolean addDependencyJars)。

复制代码
package test; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.util.Tool; 
 
public class Driver extends Configured implements Tool{ 
 
    @Override 
    public static void run(String[] arg0) throws Exception { 
        // TODO Auto-generated method stub 
        Configuration conf = HBaseConfiguration.create(); 
        conf.set("hbase.zookeeper.quorum.", "localhost");   
        Job job = new Job(conf,"Hbase"); 
        job.setJarByClass(TxtHbase.class); 
        job.setInputFormatClass(TextInputFormat.class); 
        job.setMapOutputKeyClass(Text.class); 
        job.setMapOutputValueClass(Text.class);
TableMapReduceUtilinitTableMapperJob(
"table", args0[0],MapperClass.class, job);
job.waitForCompletion(
true); }
}
复制代码

主函数

复制代码
package test; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.util.ToolRunner; 
 
public class TxtHbase { 
    public static void main(String [] args) throws Exception{ 
 
        Driver.run(new Configuration(),new THDriver(),args);  
 
    }  
}
复制代码

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9191.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论