Java,python操作Hbase
python操作Hbase
由于Hbase是java开发的,所有如需要用python进行对Hbase的操作就需要借助Thrift等工具让语言透明化
安装Thrift之前所需准备
-
wget http://archive.apache.org/dist/thrift/0.8.0/thrift-0.8.0.tar.gz
-
tar xzf thrift-0.8.0.tar.gz
-
yum install automake libtool flex bison pkgconfig gcc-c++ boost-devel libeventdevel zlib-devel python-devel ruby-devel openssl-devel
-
yum install boost-devel.x86_64
-
yum install libevent-devel.x86_64
安装Thrift
-
进入Thrift解压目录
-
运行:./configure –with-cpp=no –with-ruby=no
如图:
-
运行:make
-
运行:make install
产生针对Python的Hbase的API
-
下载hbase源码:wget http://mirrors.hust.edu.cn/apache/hbase/0.98.24/hbase-0.98.24-src.tar.gz
-
进入源码目录并查找thrift对python的支持模块:find . -name Hbase.thrift,查找后地址为:./hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
-
进入查找后的目录:cd ./hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/
-
运行命令:thrift -gen py Hbase.thrift,生成python对Hbase的模块
如图:
-
进入gen-py目录,将hbase目录拷贝到需要运行python脚本文件的同级目录中,命令:cp -raf gen-py/hbase/ /test/hbase_test
启动Thrift服务
-
命令:hbase-daemon.sh start thrift
如图:
-
检查端口是否被监听
命令:netstat -antup | grep 9090执行python文件,对hbase进行操作
-
创建create_table.py文件,进行创建表操作
from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol from hbase import Hbase from hbase.ttypes import * transport = TSocket.TSocket('master', 9090) transport = TTransport.TBufferedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = Hbase.Client(protocol) transport.open() #============================== base_info_contents = ColumnDescriptor(name='meta-data:', maxVersions=1) other_info_contents = ColumnDescriptor(name='flags:', maxVersions=1) client.createTable('new_music_table', [base_info_contents, other_info_contents]) print client.getTableNames()
运行python文件,命令:python create_table.py
- 创建insert_data.py文件,进行插入数据操作
from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol from hbase import Hbase from hbase.ttypes import * transport = TSocket.TSocket('master', 9090) transport = TTransport.TBufferedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = Hbase.Client(protocol) transport.open() tableName = 'new_music_table' rowKey = '1100' mutations = [Mutation(column="meta-data:name", value="wangqingshui"), / Mutation(column="meta-data:tag", value="pop"), / Mutation(column="flags:is_valid", value="TRUE")] client.mutateRow(tableName, rowKey, mutations, None)
- 创建get_one_line.py文件,进行获取数据操作
from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol from hbase import Hbase from hbase.ttypes import * transport = TSocket.TSocket('master', 9090) transport = TTransport.TBufferedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = Hbase.Client(protocol) transport.open() tableName = 'new_music_table' rowKey = '1100' result = client.getRow(tableName, rowKey, None) for r in result: print 'the row is ' , r.row print 'the name is ' , r.columns.get('meta-data:name').value print 'the flag is ' , r.columns.get('flags:is_valid').value
- 创建scan_many_lines.py文件,进行对hbase数据查询操作(扫描)
from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol from hbase import Hbase from hbase.ttypes import * transport = TSocket.TSocket('master', 9090) transport = TTransport.TBufferedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = Hbase.Client(protocol) transport.open() tableName = 'new_music_table' scan = TScan() id = client.scannerOpenWithScan(tableName, scan, None) result = client.scannerGetList(id, 10) for r in result: print '======' print 'the row is ' , r.row for k, v in r.columns.items(): print "/t".join([k, v.value])
模块存放位置
hbase >> python以及thrift >> python
Java操作Hbase
向Hbase中写记录
package com.cxqy.baseoperation; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.util.Bytes; public class HbasePutOneRecord { public static final String TableName = "user_action_table"; public static final String ColumnFamily = "action_log"; public static Configuration conf = HBaseConfiguration.create(); private static HTable table; public static void addOneRecord(String tableName, String rowKey, String family, String qualifier, String value) throws IOException { table = new HTable(conf, tableName); Put put = new Put(Bytes.toBytes(rowKey)); put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value)); table.put(put); System.out.println("insert record " + rowKey + " to table " + tableName + " success"); } public static void main(String[] args) throws IOException { conf.set("hbase.master", "192.168.87.200:60000"); conf.set("hbase.zookeeper.quorum", "192.168.87.200,192.168.87.201,192.168.87.202"); // TODO Auto-generated method stub try { addOneRecord(TableName, "ip=192.168.87.200-001", ColumnFamily, "ip", "192.168.87.101"); addOneRecord(TableName, "ip=192.168.87.200-001", ColumnFamily, "userid", "1100"); addOneRecord(TableName, "ip=192.168.87.200-002", ColumnFamily, "ip", "192.168.1.201"); addOneRecord(TableName, "ip=192.168.87.200-002", ColumnFamily, "userid", "1200"); addOneRecord(TableName, "ip=192.168.87.200-003", ColumnFamily, "ip", "192.168.3.201"); addOneRecord(TableName, "ip=192.168.87.200-003", ColumnFamily, "userid", "1300"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
从Hbase中读记录
package com.cxqy.baseoperation; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.util.Bytes; public class HbaseGetOneRecord { public static final String TableName = "user_action_table"; public static final String ColumnFamily = "action_log"; public static Configuration conf = HBaseConfiguration.create(); private static HTable table; public static void selectRowKey(String tablename, String rowKey) throws IOException { table = new HTable(conf, tablename); Get g = new Get(rowKey.getBytes()); Result rs = table.get(g); System.out.println("==> " + new String(rs.getRow())); for (Cell kv : rs.rawCells()) { System.out.println("--------------------" + new String(kv.getRow()) + "----------------------------"); System.out.println("Column Family: " + new String(kv.getFamily())); System.out.println("Column :" + new String(kv.getQualifier())); System.out.println("value : " + new String(kv.getValue())); } } public static void main(String[] args) throws IOException { conf.set("hbase.master", "192.168.87.200:60000"); conf.set("hbase.zookeeper.quorum", "192.168.87.200,192.168.87.201,192.168.87.202"); // TODO Auto-generated method stub try { selectRowKey(TableName, "ip=192.168.87.200-003"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
在Hbase中删除某个记录
package com.cxqy.baseoperation; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.util.Bytes; public class HbaseDelOneRecord { public static final String TableName = "user_action_table"; public static final String ColumnFamily = "action_log"; public static Configuration conf = HBaseConfiguration.create(); private static HTable table; public static void delOneRecord(String tableName, String rowKey) throws IOException { table = new HTable(conf, tableName); List<Delete> list = new ArrayList<Delete>(); Delete delete = new Delete(rowKey.getBytes()); list.add(delete); table.delete(list); System.out.println("delete record " + rowKey + " success!"); } public static void main(String[] args) throws IOException { conf.set("hbase.master", "192.168.87.200:60000"); conf.set("hbase.zookeeper.quorum", "192.168.87.200,192.168.87.201,192.168.87.202"); // TODO Auto-generated method stub try { delOneRecord(TableName, "ip=192.168.87.200-002"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
从Hbase中批量读记录
package com.cxqy.baseoperation; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.util.Bytes; public class HbaseScanManyRecords { public static final String TableName = "user_action_table"; public static final String ColumnFamily = "action_log"; public static Configuration conf = HBaseConfiguration.create(); private static HTable table; public static void getManyRecords(String tableName) throws IOException { table = new HTable(conf, tableName); Scan scan = new Scan(); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { for (KeyValue kv : result.raw()) { System.out.print(new String(kv.getRow()) + " "); System.out.print(new String(kv.getFamily()) + ":"); System.out.print(new String(kv.getQualifier()) + " "); System.out.print(kv.getTimestamp() + " "); System.out.println(new String(kv.getValue())); } } } public static void getManyRecordsWithFilter(String tableName, String rowKey) throws IOException { table = new HTable(conf, tableName); Scan scan = new Scan(); // scan.setStartRow(Bytes.toBytes("ip=10.11.1.2-996")); // scan.setStopRow(Bytes.toBytes("ip=10.11.1.2-997")); Filter filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(rowKey))); scan.setFilter(filter); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { for (KeyValue kv : result.raw()) { System.out.print(new String(kv.getRow()) + " "); System.out.print(new String(kv.getFamily()) + ":"); System.out.print(new String(kv.getQualifier()) + " "); System.out.print(kv.getTimestamp() + " "); System.out.println(new String(kv.getValue())); } } } public static void getManyRecordsWithFilter(String tableName, ArrayList<String> rowKeyList) throws IOException { table = new HTable(conf, tableName); Scan scan = new Scan(); List<Filter> filters = new ArrayList<Filter>(); for(int i = 0; i < rowKeyList.size(); i++) { filters.add(new RowFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(rowKeyList.get(i))))); } FilterList filerList = new FilterList(FilterList.Operator.MUST_PASS_ONE, filters); scan.setFilter(filerList); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { System.out.println("==============="); for (KeyValue kv : result.raw()) { System.out.print(new String(kv.getRow()) + " "); System.out.print(new String(kv.getFamily()) + ":"); System.out.print(new String(kv.getQualifier()) + " "); System.out.print(kv.getTimestamp() + " "); System.out.println(new String(kv.getValue())); } } } public static void main(String[] args) throws IOException { conf.set("hbase.master", "192.168.159.30:60000"); conf.set("hbase.zookeeper.quorum", "192.168.159.30,192.168.159.31,192.168.159.32"); // TODO Auto-generated method stub try { // getManyRecords(TableName); // getManyRecordsWithFilter(TableName, "ip=192.11.1.200-0"); ArrayList<String> whiteRowKeyList =new ArrayList<>(); whiteRowKeyList.add("ip=192.168.87.200-001"); whiteRowKeyList.add("ip=192.168.87.200-003"); getManyRecordsWithFilter(TableName, whiteRowKeyList); //getManyRecords(TableName); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/9152.html