Hbase三Java,python操作Hbase详解大数据

Java,python操作Hbase

python操作Hbase

由于Hbase是java开发的,所有如需要用python进行对Hbase的操作就需要借助Thrift等工具让语言透明化

安装Thrift之前所需准备

  1. wget http://archive.apache.org/dist/thrift/0.8.0/thrift-0.8.0.tar.gz

  2. tar xzf thrift-0.8.0.tar.gz

  3. yum install automake libtool flex bison pkgconfig gcc-c++ boost-devel libeventdevel zlib-devel python-devel ruby-devel openssl-devel

  4. yum install boost-devel.x86_64

  5. yum install libevent-devel.x86_64

安装Thrift

  1. 进入Thrift解压目录

  2. 运行:./configure –with-cpp=no –with-ruby=no
    如图:
    Hbase三Java,python操作Hbase详解大数据

  3. 运行:make

  4. 运行:make install

产生针对Python的Hbase的API

  • 下载hbase源码:wget http://mirrors.hust.edu.cn/apache/hbase/0.98.24/hbase-0.98.24-src.tar.gz

  • 进入源码目录并查找thrift对python的支持模块:find . -name Hbase.thrift,查找后地址为:./hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift

  • 进入查找后的目录:cd ./hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/

  • 运行命令:thrift -gen py Hbase.thrift,生成python对Hbase的模块
    如图:
    Hbase三Java,python操作Hbase详解大数据

  • 进入gen-py目录,将hbase目录拷贝到需要运行python脚本文件的同级目录中,命令:cp -raf gen-py/hbase/ /test/hbase_test

启动Thrift服务

  • 命令:hbase-daemon.sh start thrift
    如图:
    Hbase三Java,python操作Hbase详解大数据

  • 检查端口是否被监听
    命令:netstat -antup | grep 9090

    执行python文件,对hbase进行操作

  • 创建create_table.py文件,进行创建表操作

from thrift import Thrift 
from thrift.transport import TSocket 
from thrift.transport import TTransport 
from thrift.protocol import TBinaryProtocol 
 
from hbase import Hbase 
from hbase.ttypes import * 
 
transport = TSocket.TSocket('master', 9090) 
transport = TTransport.TBufferedTransport(transport) 
 
protocol = TBinaryProtocol.TBinaryProtocol(transport) 
 
client = Hbase.Client(protocol) 
 
transport.open() 
 
#============================== 
 
base_info_contents = ColumnDescriptor(name='meta-data:', maxVersions=1) 
other_info_contents = ColumnDescriptor(name='flags:', maxVersions=1) 
 
client.createTable('new_music_table', [base_info_contents, other_info_contents]) 
 
print client.getTableNames()

运行python文件,命令:python create_table.py

  • 创建insert_data.py文件,进行插入数据操作
from thrift import Thrift 
from thrift.transport import TSocket 
from thrift.transport import TTransport 
from thrift.protocol import TBinaryProtocol 
 
from hbase import Hbase 
from hbase.ttypes import * 
 
transport = TSocket.TSocket('master', 9090) 
transport = TTransport.TBufferedTransport(transport) 
 
protocol = TBinaryProtocol.TBinaryProtocol(transport) 
 
client = Hbase.Client(protocol) 
 
transport.open() 
 
tableName = 'new_music_table' 
rowKey = '1100' 
 
mutations = [Mutation(column="meta-data:name", value="wangqingshui"), / 
        Mutation(column="meta-data:tag", value="pop"), / 
        Mutation(column="flags:is_valid", value="TRUE")] 
 
client.mutateRow(tableName, rowKey, mutations, None)
  • 创建get_one_line.py文件,进行获取数据操作
from thrift import Thrift 
from thrift.transport import TSocket 
from thrift.transport import TTransport 
from thrift.protocol import TBinaryProtocol 
 
from hbase import Hbase 
from hbase.ttypes import * 
 
transport = TSocket.TSocket('master', 9090) 
transport = TTransport.TBufferedTransport(transport) 
 
protocol = TBinaryProtocol.TBinaryProtocol(transport) 
 
client = Hbase.Client(protocol) 
 
transport.open() 
 
tableName = 'new_music_table' 
rowKey = '1100' 
 
result = client.getRow(tableName, rowKey, None) 
 
for r in result: 
    print 'the row is ' , r.row 
    print 'the name is ' , r.columns.get('meta-data:name').value 
    print 'the flag is ' , r.columns.get('flags:is_valid').value
  • 创建scan_many_lines.py文件,进行对hbase数据查询操作(扫描)
from thrift import Thrift 
from thrift.transport import TSocket 
from thrift.transport import TTransport 
from thrift.protocol import TBinaryProtocol 
 
from hbase import Hbase 
from hbase.ttypes import * 
 
transport = TSocket.TSocket('master', 9090) 
transport = TTransport.TBufferedTransport(transport) 
 
protocol = TBinaryProtocol.TBinaryProtocol(transport) 
 
client = Hbase.Client(protocol) 
 
transport.open() 
 
tableName = 'new_music_table' 
 
scan = TScan() 
id = client.scannerOpenWithScan(tableName, scan, None) 
result = client.scannerGetList(id, 10) 
 
for r in result: 
    print '======' 
    print 'the row is ' , r.row 
 
    for k, v in r.columns.items(): 
        print "/t".join([k, v.value])

模块存放位置

hbase >> python以及thrift >> python

Hbase三Java,python操作Hbase详解大数据

Java操作Hbase

向Hbase中写记录

package com.cxqy.baseoperation; 
 
import java.io.IOException; 
import java.util.ArrayList; 
import java.util.List; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.Cell; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.KeyValue; 
import org.apache.hadoop.hbase.client.Get; 
import org.apache.hadoop.hbase.client.HTable; 
import org.apache.hadoop.hbase.client.Put; 
import org.apache.hadoop.hbase.client.Result; 
import org.apache.hadoop.hbase.client.ResultScanner; 
import org.apache.hadoop.hbase.client.Scan; 
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 
import org.apache.hadoop.hbase.filter.FilterList; 
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 
import org.apache.hadoop.hbase.util.Bytes; 
 
public class HbasePutOneRecord { 
 
    public static final String TableName = "user_action_table"; 
    public static final String ColumnFamily = "action_log"; 
 
    public static Configuration conf = HBaseConfiguration.create(); 
    private static HTable table; 
 
    public static void addOneRecord(String tableName, String rowKey, String family, String qualifier, String value) 
            throws IOException { 
        table = new HTable(conf, tableName); 
        Put put = new Put(Bytes.toBytes(rowKey)); 
        put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value)); 
        table.put(put); 
        System.out.println("insert record " + rowKey + " to table " + tableName + " success"); 
    } 
 
    public static void main(String[] args) throws IOException { 
 
        conf.set("hbase.master", "192.168.87.200:60000"); 
        conf.set("hbase.zookeeper.quorum", "192.168.87.200,192.168.87.201,192.168.87.202"); 
 
        // TODO Auto-generated method stub 
        try { 
            addOneRecord(TableName, "ip=192.168.87.200-001", ColumnFamily, "ip", "192.168.87.101"); 
            addOneRecord(TableName, "ip=192.168.87.200-001", ColumnFamily, "userid", "1100"); 
            addOneRecord(TableName, "ip=192.168.87.200-002", ColumnFamily, "ip", "192.168.1.201"); 
            addOneRecord(TableName, "ip=192.168.87.200-002", ColumnFamily, "userid", "1200"); 
            addOneRecord(TableName, "ip=192.168.87.200-003", ColumnFamily, "ip", "192.168.3.201"); 
            addOneRecord(TableName, "ip=192.168.87.200-003", ColumnFamily, "userid", "1300"); 
        } catch (Exception e) { 
            // TODO Auto-generated catch block 
            e.printStackTrace(); 
        } 
    } 
}

从Hbase中读记录

package com.cxqy.baseoperation; 
 
import java.io.IOException; 
import java.util.ArrayList; 
import java.util.List; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.Cell; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.KeyValue; 
import org.apache.hadoop.hbase.client.Get; 
import org.apache.hadoop.hbase.client.HTable; 
import org.apache.hadoop.hbase.client.Result; 
import org.apache.hadoop.hbase.client.ResultScanner; 
import org.apache.hadoop.hbase.client.Scan; 
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 
import org.apache.hadoop.hbase.filter.FilterList; 
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 
import org.apache.hadoop.hbase.util.Bytes; 
 
public class HbaseGetOneRecord { 
 
    public static final String TableName = "user_action_table"; 
    public static final String ColumnFamily = "action_log"; 
 
    public static Configuration conf = HBaseConfiguration.create(); 
    private static HTable table; 
 
    public static void selectRowKey(String tablename, String rowKey) throws IOException { 
        table = new HTable(conf, tablename); 
        Get g = new Get(rowKey.getBytes()); 
        Result rs = table.get(g); 
 
        System.out.println("==> " + new String(rs.getRow())); 
 
        for (Cell kv : rs.rawCells()) { 
            System.out.println("--------------------" + new String(kv.getRow()) + "----------------------------"); 
            System.out.println("Column Family: " + new String(kv.getFamily())); 
            System.out.println("Column       :" + new String(kv.getQualifier())); 
            System.out.println("value        : " + new String(kv.getValue())); 
        } 
    } 
 
    public static void main(String[] args) throws IOException { 
 
        conf.set("hbase.master", "192.168.87.200:60000"); 
        conf.set("hbase.zookeeper.quorum", "192.168.87.200,192.168.87.201,192.168.87.202"); 
 
        // TODO Auto-generated method stub 
        try { 
            selectRowKey(TableName, "ip=192.168.87.200-003"); 
        } catch (Exception e) { 
            // TODO Auto-generated catch block 
            e.printStackTrace(); 
        } 
    } 
 
}

在Hbase中删除某个记录

package com.cxqy.baseoperation; 
 
import java.io.IOException; 
import java.util.ArrayList; 
import java.util.List; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.Cell; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.KeyValue; 
import org.apache.hadoop.hbase.client.Delete; 
import org.apache.hadoop.hbase.client.Get; 
import org.apache.hadoop.hbase.client.HTable; 
import org.apache.hadoop.hbase.client.Put; 
import org.apache.hadoop.hbase.client.Result; 
import org.apache.hadoop.hbase.client.ResultScanner; 
import org.apache.hadoop.hbase.client.Scan; 
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 
import org.apache.hadoop.hbase.filter.FilterList; 
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 
import org.apache.hadoop.hbase.util.Bytes; 
 
public class HbaseDelOneRecord { 
 
    public static final String TableName = "user_action_table"; 
    public static final String ColumnFamily = "action_log"; 
 
    public static Configuration conf = HBaseConfiguration.create(); 
    private static HTable table; 
 
    public static void delOneRecord(String tableName, String rowKey) throws IOException { 
        table = new HTable(conf, tableName); 
        List<Delete> list = new ArrayList<Delete>(); 
        Delete delete = new Delete(rowKey.getBytes()); 
        list.add(delete); 
        table.delete(list); 
        System.out.println("delete record " + rowKey + " success!"); 
    } 
 
    public static void main(String[] args) throws IOException { 
 
        conf.set("hbase.master", "192.168.87.200:60000"); 
        conf.set("hbase.zookeeper.quorum", "192.168.87.200,192.168.87.201,192.168.87.202"); 
 
        // TODO Auto-generated method stub 
        try { 
            delOneRecord(TableName, "ip=192.168.87.200-002"); 
        } catch (Exception e) { 
            // TODO Auto-generated catch block 
            e.printStackTrace(); 
        } 
    } 
}

从Hbase中批量读记录

package com.cxqy.baseoperation; 
import java.io.IOException; 
import java.util.ArrayList; 
import java.util.List; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.Cell; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.KeyValue; 
import org.apache.hadoop.hbase.client.Delete; 
import org.apache.hadoop.hbase.client.Get; 
import org.apache.hadoop.hbase.client.HTable; 
import org.apache.hadoop.hbase.client.Put; 
import org.apache.hadoop.hbase.client.Result; 
import org.apache.hadoop.hbase.client.ResultScanner; 
import org.apache.hadoop.hbase.client.Scan; 
import org.apache.hadoop.hbase.filter.BinaryComparator; 
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 
import org.apache.hadoop.hbase.filter.Filter; 
import org.apache.hadoop.hbase.filter.FilterList; 
import org.apache.hadoop.hbase.filter.RowFilter; 
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 
import org.apache.hadoop.hbase.util.Bytes; 
public class HbaseScanManyRecords { 
public static final String TableName = "user_action_table"; 
public static final String ColumnFamily = "action_log"; 
public static Configuration conf = HBaseConfiguration.create(); 
private static HTable table; 
public static void getManyRecords(String tableName) throws IOException { 
table = new HTable(conf, tableName); 
Scan scan = new Scan(); 
ResultScanner scanner = table.getScanner(scan); 
for (Result result : scanner) { 
for (KeyValue kv : result.raw()) { 
System.out.print(new String(kv.getRow()) + " "); 
System.out.print(new String(kv.getFamily()) + ":"); 
System.out.print(new String(kv.getQualifier()) + " "); 
System.out.print(kv.getTimestamp() + " "); 
System.out.println(new String(kv.getValue())); 
} 
} 
} 
public static void getManyRecordsWithFilter(String tableName, String rowKey) throws IOException { 
table = new HTable(conf, tableName); 
Scan scan = new Scan(); 
//      scan.setStartRow(Bytes.toBytes("ip=10.11.1.2-996")); 
//      scan.setStopRow(Bytes.toBytes("ip=10.11.1.2-997")); 
Filter filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(rowKey))); 
scan.setFilter(filter); 
ResultScanner scanner = table.getScanner(scan); 
for (Result result : scanner) { 
for (KeyValue kv : result.raw()) { 
System.out.print(new String(kv.getRow()) + " "); 
System.out.print(new String(kv.getFamily()) + ":"); 
System.out.print(new String(kv.getQualifier()) + " "); 
System.out.print(kv.getTimestamp() + " "); 
System.out.println(new String(kv.getValue())); 
} 
} 
} 
public static void getManyRecordsWithFilter(String tableName, ArrayList<String> rowKeyList) throws IOException { 
table = new HTable(conf, tableName); 
Scan scan = new Scan(); 
List<Filter> filters = new ArrayList<Filter>(); 
for(int i = 0; i < rowKeyList.size(); i++) { 
filters.add(new RowFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(rowKeyList.get(i))))); 
} 
FilterList filerList = new FilterList(FilterList.Operator.MUST_PASS_ONE, filters); 
scan.setFilter(filerList); 
ResultScanner scanner = table.getScanner(scan); 
for (Result result : scanner) { 
System.out.println("==============="); 
for (KeyValue kv : result.raw()) { 
System.out.print(new String(kv.getRow()) + " "); 
System.out.print(new String(kv.getFamily()) + ":"); 
System.out.print(new String(kv.getQualifier()) + " "); 
System.out.print(kv.getTimestamp() + " "); 
System.out.println(new String(kv.getValue())); 
} 
} 
} 
public static void main(String[] args) throws IOException { 
conf.set("hbase.master", "192.168.159.30:60000"); 
conf.set("hbase.zookeeper.quorum", "192.168.159.30,192.168.159.31,192.168.159.32"); 
// TODO Auto-generated method stub 
try { 
//          getManyRecords(TableName); 
//          getManyRecordsWithFilter(TableName, "ip=192.11.1.200-0"); 
ArrayList<String> whiteRowKeyList =new ArrayList<>(); 
whiteRowKeyList.add("ip=192.168.87.200-001"); 
whiteRowKeyList.add("ip=192.168.87.200-003"); 
getManyRecordsWithFilter(TableName, whiteRowKeyList); 
//getManyRecords(TableName); 
} catch (Exception e) { 
// TODO Auto-generated catch block 
e.printStackTrace(); 
} 
} 
}

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/9152.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论