一、前言
Hive是Hadoop上的数据仓库框架,其设计目的是让精通SQL技能(但Java编程技能相对较弱)的分析师能够在存放到HDFS大规模数据集上运行查询。提出Hive的主要原因是SQL并不是所有的“大数据”的理想工具。
Hive在工作站上运行,它把SQL转换为一系列在Hadoop集群上运行的MapReduce作业,即用MapReduce操作HDFS数据。Hive把数据组织为表,通过这种方式为存储在HDFS上的数据赋予结构。元数据——如表模式——存储在名为metastore的数据库中。
Hive的metastore默认存储在本地机器上,这样就无法和其他用户共享这些定义。后面将会详细讲述如何在生产环境中设置远程共享metastore。
二、HiveQL
用户通过在Hive的解释器交互,发出HiveQL命令,HiveQL是Hive的查询语言,它是SQL的一种方言,和mysql有很大的相似之处。
1、基本命令:
a、显示metastore数据库中的表:
hive> show tables; OK Time taken: 1.528 seconds hive> show tables; OK Time taken: 0.126 seconds hive>
系统采用”懒“策略,第一次比较慢,当创建metastore数据库后加载的就比较快了。该数据库存放在你运行hive命令所在位置下名为metastore_db的目录中。
我的Hive metastore_db的目录位置如下:
[[email protected] admin]# find / -name metastore_db /usr/lib64/R/metastore_db
b.运行Hive脚本
对于较长的脚本,通常保存在.q文件中,在shell环境下用如下命令执行:
% hive -f script.q
如果对于较短的脚本,可以使用-e命令在行内嵌入执行,这两种情况下都不需要加上表示结束的分号。
% hive -e "select * from dummy"
下面是一个生成单行表的方法:
[[email protected] admin]# echo 'X' > /tmp/dummy.txt [[email protected] admin]# cat /tmp/dummy.txt X [[email protected] admin]# hive -e "create table dummy(value STRING);/ > load data local inpath '/tmp/dummy.txt' / > overwrite into table dummy" Logging initialized using configuration in jar:file:/opt/cloudera/parcels/CDH-4.3.0-1.cdh4.3.0.p0.22/lib/hive/lib/hive-common-0.10.0-cdh4.3.0.jar!/hive-log4j.properties Hive history file=/tmp/root/hive_job_log_450335d5-274c-4082-81a7-0b4bbe8d1c0c_944836217.txt OK Time taken: 2.321 seconds Copying data from file:/tmp/dummy.txt Copying file: file:/tmp/dummy.txt Loading data to table default.dummy rmr: DEPRECATED: Please use 'rm -r' instead. Moved: 'hdfs://master:8020/user/hive/warehouse/dummy' to trash at: hdfs://master:8020/user/root/.Trash/Current Table default.dummy stats: [num_partitions: 0, num_files: 1, num_rows: 0, total_size: 2, raw_data_size: 0] OK Time taken: 1.369 seconds [[email protected] admin]#
无论是在shell命令还是在交互式环境下,Hive都会把操作运行的时间打印到标准错误输出,可以在启动程序的时候使用– S选项强制不限时这条消息,其结果只是查询输出结果。
带-S的和不带的对比结果如下:
[[email protected] admin]# hive -S -e 'select * from dummy' X [[email protected] admin]# hive -e 'select * from dummy' Logging initialized using configuration in jar:file:/opt/cloudera/parcels/CDH-4.3.0-1.cdh4.3.0.p0.22/lib/hive/lib/hive-common-0.10.0-cdh4.3.0.jar!/hive-log4j.properties Hive history file=/tmp/root/hive_job_log_44207314-384b-4f29-a590-834c723b96ad_444463715.txt OK X Time taken: 2.173 seconds [[email protected] admin]#
备注:
其他有用的Hive交互式程序的特性有:使用a!前缀来运行宿主操作系统的命令;使用dfs来访问hadoop文件系统。
c、示例
和RDMS一样,Hive把数据组织成表,下面我们用create table语句为气象数据创建一个表格(各行换行符分隔,用’/t’分隔字段):
create table records (year string, temperature int, quality int) row format delimited fields terminated by '/t';
创建表格完成后,我们可以向Hive中输入数据,overwrite关键字告诉Hive删除表所对应的目录下的所有文件,如果省略overwrite,Hive就简单的把新文件加入目录,如果有同名文件就替换掉,其他的不作处理。
load data local inpath 'input/ncdc/micro-tab/sample.txt' overwrite into table records;
这样命令告诉Hive把指定的本地文件放到它的存储目录中,这仅仅是一个简单的文件系统操作,不解析文件,也不会将其转换为内部格式,这是因为hive并不强制星星某种特定的文件格式。文件以原样子逐字存储,Hive对文件没有做任何修改。
Hive的表存储在HDFS中,由(fs.default.name 设为默认值file:///),在Hive的仓库目录中,表存储为目录。仓库录由hive.metastore.warehouse.dir设定,默认值为/user/hive/warehouse,在HDFS的根目录下。
Hive的查询语句:
hive> select year, max(temperature) > from records > where temperature!=9999 > and (quality=0 or quality=1 or quality=4 or quality=5 or quality=9) > group by year;
d、 多个Hive 共享hadoop集群
如果准备让多个Hive用户共享一个Hadoop集群,则需要更改Hive所使用目录的权限,对所有用户可写。用以下命令创建,并设置合适的权限:
% hadoop -mkdir /tmp % hadoop -chmod a+w /tmp % hadoop -mkdir /user/hive/warehouse % hadoop -chmod a+w /user/hive/warehouse
如果所有用户在同一个用户组中,把仓库目录权限设置为g+w即可。
e、在一个会话中使用SET命令更改设置
hive> set hive.enforce.bucketing=true;
可以只使用带属性名的SET命令查看属性的当前值:
hive> set hive.enforce.bucketing;
设置属性的优先级,数值越小,优先级越高。
1、Hive SET命令
2、命令行-hiveconf选项
3、hive-site.xml
4、hive-default.xml
5、hadoop-site.xml(或等价的core-site.xml、hdfs-site.xml、mapred-site.xml)
6、hadoop-default.xml(或等价的core-default.xml、hdfs-default.xml、mapred-default.xml)
可以对日志的配置进行设置,下面的语句可以方便的将调试信息发送到控制台:
% hive -hiveconf hive.root.logger=DEBUG, console
e、metastore
metastore是Hive的元数据的集中存放地,metastore包括两部分:后台和数据库的存储。默认情况下,metastore服务和hive服务运行在同一个JVM中,它包含一个内嵌的以本地磁盘作为存储的Derby数据库实例,使用内嵌数据库是Hive入门最简单的方法,在局限是一次只能访问一个磁盘上的数据文件,这就意味着一次只能为每一个metastore打开一个hive会话,如果启动两个会话时就会报如下错误:
Failed to start database 'metastore_db'
若果要支持多会话,就要配置使用一个独立的数据库,这种配置称为”本地metastore”。
f、HiveQL和SQL的比较
SQL的延迟级别为秒级,而HiveQL延迟为分钟级别。HiveSQL支持create table as select语法,而SQL不支持。SQL支持存储过程,而HiveSQL支持用户定义函数,MapReduce脚本。
Hive的string类似其他数据库中的VARCHAR,但不能声明存储长度,最长可以存储2GB字符数(理论上)。当然这样做效率较低,可以使用Sqoop对大对象的处理。
可以使用CAST进行数据类型转换,例如CAST(‘1’ AS INT),可以把字符串’1’转换为整数1。如果转换失败,那么表达式会返回空(CAST (‘X’ AS INT))。
g、复杂类型
Hive有三种复杂数据类型:array、map、struct,复杂数据类型必须用尖括号”<>”指明其中数据字段的类型。
如下表所示的表定义有三列,每一种对应一种复杂的数据类型:
create table complex( col1 arrary<int>, col2 map<string, int>, col3 struct<a:string, b:int, c:double> );
下面是展示每种数据类型的访问操作:
hive>select col1[0], col2['b'], col3.c from complex;
h、操作与函数
可以通过hive shell下面键入show functions获取函数列表,用describe function length获取函数帮助。
提供普通的SQL操作:关系操作(x=’a’,空值判断x is null,模式匹配 x like ‘A%’),算数操作(x+1),以及逻辑或(or),如x or y。MySql和Hive中字符串连接使用concat函数。
i、表
Hive表格逻辑上由存储的数据和描述表格中数据形式的相关数据组成。数据一般存放在HDFS中,当然也可以放在本地文件系统中,而把元数据放在关系数据库中。
数据库支持命令空间,0.90的hive也支持命名空间,提供了create database dbname, use dbname以及drop database dbname这样的语句。
托管表和外部表
这两种表的区别表现在load和drop命令的语义上。
加载托管表时,Hive把数据移动到仓库目录,例如;
create table managed_table(dummy string); load data inpath '/user/tom/data.txt' into table managed_table;
把文件从hdf://user/tom/data.txt 移动到hive的数据仓库目录managed_table表的目录,即hdfs://user/hive/warehouse/managed_table。
如果随后要删除一个表格,可以用
drop table managed_table;
它的表(包括数据和元数据)会一起被删除,这就是hive所谓的”数据托管的含义“。
而对于外部表而言,这两个操作结果就不一样了,用户来控制数据的创建和删除。外部数据的位置要在创建表格的时候说明:
create external table external_table(dummy string) location '/user/tom/external_table'; load data inpath '/user/tom/data.txt' into table external_table;
使用external关键字之后,hive知道数据并不由自己管理,因此不会把数据移动到自己的仓库目录。丢弃外部表时,Hive不会碰数据,仅删除元数据。
经验法则:所有的数据都在hive中完成,使用托管表,如果使用Hive和其他工具共同处理一个数据集,应该使用外部表。普遍的做法是把存放在hdfs(由其他进程创建)的初始数据集用作外部表,然后使用hive的变换功能把数据移动到托管的hive表,可以使用hive导出数据供其他应用程序使用。
j、分区和桶
hive把表组织成分区(partition),根据分区列(partition column,如日期)对表进行粗略划分的机制。使用分区可以加快数据分片(slice)的查询速度。分区可以进一步划分为桶(bucket)。它会为数据提供额外的结构以获得更搞笑的查询处理。例如,通过根据用户ID来划分桶,我们可以在所有用户集合的随机样本上快速计算基于用户的查询。
使用分区并不会影响大范围查询的执行,我们依然可以查询跨多个分区的整个数据集合。
对于假想的日志文件,在根据日期对日志进行分区外,还可以能根据国家对每个分区进行子分区(subpartition),以加速根据地理位置进行查询。分区在创建表格的时候用partitioned by子句定义,该子句需要定义列的列表。我们可能要把表记录定义为由时间戳和日志行构成:
create table logs(ts bigint, line string) partitioned by (dt string, country string);
在我们把数据加载到分区表的时候要显示指定分区值:
load data local inpath 'input/hive/partitions/file1' into table logs partition (dt='2010-01-01', country='GB');
在文件系统级别,分区只是表示目录下嵌套的子目录。把更多文件加载到日志表以后,目录结构可能像下面这样:
/user/hive/warehouse/logs/dt=2010-01-01/country=GB/file1 /file2 /country=US/file3 /user/hive/warehouse/logs/dt=2010-01-02/country=GB/file4 /country=US/file5 /file6
可以使用show partitions 表名 命令查询表中有哪些分区:
hive> show partitions logs; dt=2010-01-01/country=GB dt=2010-01-01/country=US dt=2010-01-02/country=GB dt=2010-01-02/country=US
注意:partitioned by 子句中的列定义是表中正式的列,称为“分区列”(partition column)。但是数据文件并不包含这些列的值,因为它们源于目录名。
实际使用的过程中可以以普通方式使用分区列。Hive会对输入进行修剪,从而只扫描相关分区。例如:
select ts, dt, line from logs where country='GB';
将只扫描file1、file2、file4。还要注意,这个查询也返回dt分区列的值。这个值是hive从目录名中读取的,因为他们在数据文件中并不存在。
桶:
把表(或分区)组织成桶(bucket)有两个理由。第一理由是获得更高效的查询处理效率,第二个理由是取样更高效。
使用clustered by子句来指定划分桶所在的列和要划分为桶的个数:
create table bucketed_users (id int, name string) clustered by (id) into 4 buckets;
可以使用HiveQL对两个划分了桶的表进行连接。
桶中的数据可以根据一个或者多个列另外进行排序,所以这样对每个桶的连接就编程了高效的合并排序,因此可以进一步提升map端连接的效率。以下语法说明一个连接使其使用排序桶。
create table bucketed_users(id int, name string) clustered by (id) sorted by (id asc) into 4 buckets;
有两种方式将表中的数据划分成桶:一是将Hive外的数据加载到划分成桶的表中,二是针对已有的表可以用Hive来划分桶。
建议用hive来划分桶,以为hive不检查数据文件中的桶是否和表定义中的桶一致。
有个没有划分桶的表格:
hive> select * from user; OK 0 Nat 2 Joe 3 Kay 4 Ann Time taken: 2.178 seconds
要将桶内填充成员,需要将hive.enforce.bucketing的属性设置为true。
这样hive就知道使用表定义中生命的变量来创建桶。然后使用insert命令创建即可:
hive> insert overwrite table bucketed_users > select * from user;
物理上每个桶就是表(或分区)里的一个文件。但是桶n是按照字典排列的第n个文件。事实上,桶对应于MapReduce的输出文件分区,一个作业产生的桶(输出文件)和reduce任务个数相等。从下面执行的MapReduce程序中可以看出:
Total MapReduce jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 4 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapred.reduce.tasks=<number> Starting Job = job_201306210458_0022, Tracking URL = http://master:50030/jobdetails.jsp?jobid=job_201306210458_0022 Kill Command = /opt/cloudera/parcels/CDH-4.3.0-1.cdh4.3.0.p0.22/lib/hadoop/bin/hadoop job -kill job_201306210458_0022 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 4
查看bucketed_users表的布局可以得到
hive> dfs -ls /user/hive/warehouse/bucketed_users; Found 4 items -rw-r--r-- 2 admin supergroup 12 2013-06-22 22:14 /user/hive/warehouse/bucketed_users/000000_0 -rw-r--r-- 2 admin supergroup 0 2013-06-22 22:14 /user/hive/warehouse/bucketed_users/000001_0 -rw-r--r-- 2 admin supergroup 6 2013-06-22 22:14 /user/hive/warehouse/bucketed_users/000002_0 -rw-r--r-- 2 admin supergroup 6 2013-06-22 22:14 /user/hive/warehouse/bucketed_users/000003_0
4个新建的文件
具体桶内的数据如下所示:
hive> dfs -cat /user/hive/warehouse/bucketed_users/*0_0; 0Nat 4Ann hive> dfs -cat /user/hive/warehouse/bucketed_users/*1_0; hive> dfs -cat /user/hive/warehouse/bucketed_users/*2_0; 2Joe hive> dfs -cat /user/hive/warehouse/bucketed_users/*3_0; 3Kay
使用tablesample子句对表进行取样,我们可以获得相同的结果。这个子句会把查询限定在表的一部分桶内,而不是整个表:
hive> select * from bucketed_users > tablesample(bucket 1 out of 4 on id); ........... OK 0 Nat 4 Ann Time taken: 7.289 seconds
1/4 第一个桶,下面的查询会会返回1/2桶:
hive> select * from bucketed_users > tablesample(bucket 1 out of 2 on ; ............. OK 0 Nat 4 Ann 2 Joe Time taken: 10.367 seconds
当然,可以用其他比例对若干个桶进行取样,因为取样并不是一个精确的操作,因此这个比例一定是桶的整数倍。
用此取样分桶表是非常高效的操作,如果使用rand()函数对没有划分成桶的表进行取样,及时只需要读取很小的一部分样本,也要输入整个数据集。所以rand的特点是扫描次数多,效率低,用下面查询结果耗时和上面对比可以得出。
hive> select * from bucketed_users > tablesample(bucket 1 out of 4 on rand()); ...................... OK Time taken: 10.379 seconds
k、存储格式
最简单的是文本格式,同时支持面向行的和面向列的二进制格式。
分隔符的问题,create table ……语句等价于下面的语句:
create table row format delimited fields terminated by '/001' collection iterms terminated by '/002' map keys terminated by '/003' lines terminated by '/n' stored as textfile;
注意,可以使用八进制表示分隔符,例如001表示Control-A。
支持顺序文件Sequence File和RCFile,按列记录文件。
在Hvie中可以使用下面的句子还启用面向列的存储:
create table ........... row format serde 'org.apache.hadoop.hive.seder2.columnar.ColumnarSerDe' store as RCFile;
示例:利用正则表达式从一个文本文件中读取定长的观测站数据
create table station (usaf string, wban, name string) row format serde 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe' with serdeproperties ( "input.regex"="(//d{6}) (//d{5}) (.{29}) .*");
SerDe要设置相应的属性值,在这里要设置RegexSerDe特有的input.regex属性。
用load data向表中输入数据:
load data local inpath “input/ncdc/metadata/stations-fixed-with.txt” into table station;
加载操作并不适用表的SerDe。
从表中检索数据时,用简单的查询所示,反序列化会调用SerDe解析这个字段:
hive> select * from stations limit 4; 10000 99999 BOGUS NORWAY 010003 99999 BOGUS NORWAY 010010 99999 JAN MAYEN 010013 99999 ROST
l、导入数据
如果想把数据从关系数据库直接导入hive,请参考Sqoop。
insert overwrite table
insert overwrite table target select col1, col2 from source;
对于分区的表,可以使用partition子句来指明
insert overwrite table target partition (dt='2010-01-01') select col1, col2 from source;
使用overwrite会替换掉目标,而如果要向已经填充了内容的表添加记录,可以使用不带overwrite 关键字的load data操作。
从hive 0.6.0开始,可以在select 语句中通过使用分区值来动态指明分区:
insert overwrite table target partition dt select col1, col2, dt from source;
这种方法称为动态分区插入法,这一特性默认是关闭的,可以通过命令set 命令查看开启,将hive.exec.dynamic.partition=true。
当前hive不支持使用insert into values (…….)的形式。
多表插入:
在hive中,可以把insert语句倒过来,把from语句放在最前面,查询的效果是一样的:
from source insert overwrite table target select col1, col2;
在一个表查询中,可以使用多个insert的语句,只要扫描一遍表,就可以生成多个不相交的输出。
FROM records2 INSERT OVERWRITE TABLE stations_by_year SELECT year, COUNT(DISTINCT station) GROUP BY year INSERT OVERWRITE TABLE records_by_year SELECT year, COUNT(1) GROUP BY year INSERT OVERWRITE TABLE good_records_by_year SELECT year, COUNT(1) WHERE temperature != 9999 AND (quality = 0 OR quality = 1 OR quality = 4 OR quality = 5 OR quality = 9) GROUP BY year;
这里有一个源表,三个目标表。
create table ……as select…….
create table target as select col1, col2 from source;
m、表的修改
重命名表
alter table source rename to target;
添加新的列
alter table target add columns (col3 string);
n、表的丢弃
drop table删除表的元数据和数据
也可以仅仅删除数据文件,保留表结构,默认为空表
hive > dfs -rmr /user/hive/warehouse/my_table;
这时候,可以使用like关键字创建一个与第一个表模式相同的新表:
create table new_table like existing_table;
o、查询数据
排序和聚集
可以用order by子句对数据进行全局排序,但是它只是用一个reducer完成的。在多数情况下不需要全局排序的情况下,可以使用sort by局部排序,为每一个reduce产生一个排序文件。有些时候需要控制某个特定行应该到哪个reducer,通常是为了后续的聚集操作,这就是hive的 distribute by所做的事情,下面的例子根据年份和气温对气象数据进行排序,以确保所有年份所在的行,最终都在一个reduce分区中。
hive> from records2 >select year, temperature > distribute by year > sort by year asc, temperature desc;
1949 111
1949 78
1950 22
1950 0
1950 11
如果sort by 和distribute by中所用到的列相同,可以缩写为cluster by一遍同时制定两者相同的列。
使用Hadoop Streaming 、transform 、map、reduce子句这样的方法,便可以在hive中调用外部脚本。
p、连接
同mapreduce相比,hive的好处是简化了常用操作。
内连接:
hive> SELECT * FROM sales; Joe 2 Hank 4 Ali 0 Eve 3 Hank 2 hive> SELECT * FROM things; 2 Tie 4 Coat 3 Hat 1 Scarf
sales:人名及所购商品的id,sales:上平id及名称
hive> SELECT sales.*, things.* > FROM sales JOIN things ON (sales.id = things.id); Joe 2 2 Tie Hank 2 2 Tie Eve 3 3 Hat Hank 4 4 Coat
hive只支持等值连接,条件是两个表的id必须相等。
与数据库不同,hive不支持where子句,且只允许在from子句中出现一个表。
用explain 关键字可以查询连接的过程中使用多少个MapReduce,可以用explain extended查看更详细的信息。
EXPLAIN SELECT sales.*, things.* FROM sales JOIN things ON (sales.id = things.id);
外连接:
使用left outer join,查询会返回左侧表(sales)中的每一个数据行,及时这些行与这各表索要连接的表(things)中的任何数据对应。
hive> SELECT sales.*, things.* > FROM sales LEFT OUTER JOIN things ON (sales.id = things.id); Ali 0 NULL NULL
Joe 2 2 Tie
Hank 2 2 Tie
Eve 3 3 Hat
Hank 4 4 Coat
hive也支持right outer join右外连接
hive> SELECT sales.*, things.* > FROM sales RIGHT OUTER JOIN things ON (sales.id = things.id); NULL NULL 1 Scarf Joe 2 2 Tie Hank 2 2 Tie Eve 3 3 Hat Hank 4 4 Coat
全外连接,full outer join
hive> SELECT sales.*, things.* > FROM sales FULL OUTER JOIN things ON (sales.id = things.id); Ali 0 NULL NULL NULL NULL 1 Scarf Joe 2 2 Tie Hank 2 2 Tie Eve 3 3 Hat Hank 4 4 Coat
半连接:
下面的in 子句能够查找things表中在sales表中出现过的所有商品:
select *
from things
where things.id in (select id from sales);
但是hive不支持in子句查询,所以要对其进行改写,改写后如下:
hive> select * from things; OK 2 Tie 4 Coat 3 Hat 1 Scarf Time taken: 0.282 seconds hive> select * from sales; OK Joe 2 Hank 4 Ali 0 Eve 3 Hank 2 Time taken: 0.227 seconds
对上述表操作,左半连接,只要左半自身。
hive> select * > from things left semi join sales on (sales.id=things.id); .................. OK 2 Tie 3 Hat 4 Coat Time taken: 14.528 seconds
map连接:
如果要指定使用map连接,需要在sql中使用C语言风格的注释,从而给出提示:
select /* mapjoin(things) */ sales.*, things.* from sales join things on (sales.id=things.id);
执行这个查询不适用reducer,只在所有输入上进行聚集,使用下面的语法启用优化选项:set hive.optimize.bucketmapjoin=true;
q、子查询
hive对子查询支持有限,只允许出现在select 语句的from子句中。
下面的语句可以查出每年每个气象站最高气温的均值:
SELECT station, year, AVG(max_temperature) FROM ( SELECT station, year, MAX(temperature) AS max_temperature FROM records2 WHERE temperature != 9999 AND (quality = 0 OR quality = 1 OR quality = 4 OR quality = GROUP BY station, year ) mt GROUP BY station, year;
内层查询查询出每个气象站的最高气温,外层查询使用AVG聚集函数计算这些最高读数的均值。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9803.html