Co2y's Blog

hive相关及延伸

要讨论班了,赶快写点东西

从hadoop开始说起

先挖坑,待填

hive详解

hive架构

下图是hive的架构图

体系结构

Hive的体系结构可以分为以下几部分

  1. 用户接口
  2. Hive将元数据存储在数据库中,如mysql、derby。Hive中的元数据包括表的名字,表的列和分区及其属性,表的属性(是否为外部表等),表的数据所在目录等。
  3. 解释器、编译器、优化器完成HQL查询语句从词法分析、语法分析、编译、优化以及查询计划的生成。生成的查询计划存储在HDFS中,并在随后有MapReduce调用执行。
  4. Hive的数据存储在HDFS中,大部分的查询、计算由MapReduce完成

数据存储

对于数据存储,Hive没有专门的数据存储格式,也没有为数据建立索引,用户可以非常自由的组织Hive中的表,只需要在创建表的时候告诉Hive数据中的列分隔符和行分隔符,Hive就可以解析数据。Hive中所有的数据都存储在HDFS中,存储结构主要包括数据库、文件、表和视图。Hive中包含以下数据模型:Table内部表,External Table外部表,Partition分区,Bucket桶。Hive默认可以直接加载文本文件,还支持sequence file 、RCFile。

执行原理

下图是hive的执行原理

Hive构建在Hadoop之上,

  1. HQL中对查询语句的解释、优化、生成查询计划是由Hive完成的
  2. 所有的数据都是存储在Hadoop中
  3. 查询计划被转化为MapReduce任务,在Hadoop中执行(有些查询没有MR任务,如:select * from table)
  4. Hadoop和Hive都是用UTF-8编码的

sql编译过程

Hive编译器将一个Hive QL转换操作符。操作符Operator是Hive的最小的处理单元,每个操作符代表HDFS的一个操作或者一道MapReduce作业。Operator都是hive定义的一个处理过程,所有的操作构成了Operator图,hive正是基于这些图关系来处理诸如limit, group by, join等操作。这个在第二部分会有详解。

hive编译器的组成

编译流程

与数据库的区别

  • 数据格式。Hive中没有定义专门的数据格式,数据格式可以由用户指定,用户定义数据格式需要指定三个属性:列分隔符(通常为空格、”\t”、”\x001″)、行分隔符(”\n”)以及读取文件数据的方法(Hive中默认有三个文件格式TextFile,SequenceFile以及RCFile)。由于在加载数据的过程中,不需要从用户数据格式到Hive定义的数据格式的转换,因此,Hive在加载的过程中不会对数据本身进行任何修改,而只是将数据内容复制或者移动到相应的HDFS目录中。而在数据库中,不同的数据库有不同的存储引擎,定义了自己的数据格式。所有数据都会按照一定的组织存储,因此,数据库加载数据的过程会比较耗时。
  • 数据更新。由于Hive是针对数据仓库应用设计的,而数据仓库的内容是读多写少的。因此,Hive中不支持对数据的改写和添加,所有的数据都是在加载的时候中确定好的。而数据库中的数据通常是需要经常进行修改的,因此可以使用INSERT INTO … VALUES添加数据,使用UPDATE … SET修改数据。
  • 索引。之前已经说过,Hive在加载数据的过程中不会对数据进行任何处理,甚至不会对数据进行扫描,因此也没有对数据中的某些Key建立索引。Hive要访问数据中满足条件的特定值时,需要暴力扫描整个数据,因此访问延迟较高。由于MapReduce的引入, Hive可以并行访问数据,因此即使没有索引,对于大数据量的访问,Hive仍然可以体现出优势。数据库中,通常会针对一个或者几个列建立索引,因此对于少量的特定条件的数据的访问,数据库可以有很高的效率,较低的延迟。由于数据的访问延迟较高,决定了Hive不适合在线数据查询。
  • 可扩展性。由于Hive是建立在Hadoop之上的,因此Hive的可扩展性是和Hadoop的可扩展性是一致的(世界上最大的Hadoop集群在Yahoo!,2009年的规模在4000台节点左右)。而数据库由于ACID语义的严格限制,扩展行非常有限。目前最先进的并行数据库Oracle在理论上的扩展能力也只有100台左右。

元数据库

表名 说明 关联键
TBLS 所有hive表的基本信息 TBL_ID,SD_ID
TABLE_PARAM 表级属性,如是否外部表,表注释等 TBL_ID
COLUMNS Hive表字段信息(字段注释,字段名,字段类型,字段序号) SD_ID
SDS 所有hive表、表分区所对应的hdfs数据目录和数据格式 SD_ID,SERDE_ID
SERDE_PARAM 序列化反序列化信息,如行分隔符、列分隔符、NULL的表示字符等 SERDE_ID
PARTITIONS Hive表分区信息 PART_ID,SD_ID,TBL_ID
PARTITION_KEYS Hive分区表分区键 TBL_ID
PARTITION_KEY_VALS Hive表分区名(键值) PART_ID

从上面表的内容来看,hive整个创建表的过程已经比较清楚了。

  1. 解析用户提交hive语句,对其进行解析,分解为表、字段、分区等hive对象
  2. 根据解析到的信息构建对应的表、字段、分区等对象,从 SEQUENCE_TABLE中获取构建对象的最新ID,与构建对象信息(名称,类型等)一同通过DAO方法写入到元数据表中去,成功后将SEQUENCE_TABLE中对应的最新ID+5。

join及查询操作

  • Hive只支持等值连接(equality joins)、外连接(outer joins)和(left/right joins)。Hive不支持所有非等值的连接,因为非等值连接非常难转化到map/reduce任务。
  • join 时,每次 map/reduce 任务的逻辑: reducer 会缓存 join 序列中除了最后一个表的所有表的记录, 再通过最后一个表将结果序列化到文件系统。这一实现有助于在reduce端减少内存的使用量。实践中,应该把最大的那个表写在最后(否则会因为缓存浪费大量内存)。
  • LEFT SEMI JOIN 是 IN/EXISTS 子查询的一种更高效的实现。Hive 当前没有实现 IN/EXISTS 子查询,所以你可以用 LEFT SEMI JOIN 重写你的子查询语句。LEFT SEMI JOIN的限制是, JOIN子句中右边的表只能在ON子句中设置过滤条件,在WHERE子句、SELECT子句或其他地方过滤都不行。
  • 当前的Hive不支持在一条查询语句中有多Distinct。如果要在Hive查询语句中实现多Distinct,需要使用至少n+1条查询语句(n为distinct的数目),前n条查询分别对n个列去重,最后一条查询语句对n个去重之后的列做Join操作,得到最终结果。

hive sql to mapreduce

例子

join

1
select u.name, o.orderid from order o join user u on o.uid = u.uid;

在map的输出value中为不同表的数据打上tag标记,在reduce阶段根据tag判断数据来源。MapReduce的过程如下(这里只是说明最基本的Join的实现,还有其他的实现方式)

其它例子见

美团技术博客

本节全部内容均参考该博客

转换流程

  1. Antlr定义SQL的语法规则,完成SQL词法,语法解析,将SQL转化为抽象语法树AST Tree
  2. 遍历AST Tree,抽象出查询的基本组成单元QueryBlock
  3. 遍历QueryBlock,翻译为执行操作树OperatorTree
  4. 逻辑层优化器进行OperatorTree变换,合并不必要的ReduceSinkOperator,减少shuffle数据量
  5. 遍历OperatorTree,翻译为MapReduce任务
  6. 物理层优化器进行MapReduce任务的变换,生成最终的执行计划

hive 优化

本节从实际应用和参数配置的角度来优化hive

根据hadoop计算框架的特性可能会发生以下问题

  1. 数据倾斜
  2. jobs太多
  3. count(distinct ),在数据量大的情况下,效率较低,如果是多count(distinct )效率更低,因为count(distinct)是按group by 字段分组,按distinct字段排序,一般这种分布方式是很倾斜的。举个例子:比如男uv,女uv,像淘宝一天30亿的pv,如果按性别分组,分配2个reduce,每个reduce处理15亿数据。

通用方法:

  1. 解决数据倾斜问题。sum,count,max,min等UDAF,不怕数据倾斜问题,hadoop在map端的汇总合并优化,使数据倾斜不成问题。set hive.groupby.skewindata=true;
  2. 减少job数。
  3. 设置合理的map reduce的task数,能有效提升性能。(比如,10w+级别的计算,用160个reduce,那是相当的浪费,1个足够)。
  4. 对小文件进行合并,是行至有效的提高调度效率的方法,假如所有的作业设置合理的文件数,对云梯的整体调度效率也会产生积极的正向影响。

性能底下的根源

在mapreduce框架不变的情况下,Hadoop的核心能力是parition和sort,因而这也是优化的根本。

配置角度优化

列裁剪

Hive 在读数据的时候,可以只读取查询中所需要用到的列,而忽略其它列。 例如,若有以下查询:

1
SELECT a,b FROM q WHERE e<10;

  在实施此项查询中,Q 表有 5 列(a,b,c,d,e),Hive 只读取查询逻辑中真实需要 的 3 列 a、b、e,而忽略列 c,d;这样做节省了读取开销,中间表存储开销和数据整合开销。

  裁剪所对应的参数项为:hive.optimize.cp=true(默认值为真)

分区裁剪

1
2
SELECT * FROM (SELECTT a1,COUNT(1) FROM T GROUP BY a1) subq WHERE subq.prtn=100; #(多余分区)
SELECT * FROM T1 JOIN (SELECT * FROM T2) subq ON (T1.a1=subq.a2) WHERE subq.prtn=100;

查询语句若将“subq.prtn=100”条件放入子查询中更为高效,可以减少读入的分区 数目。 Hive 自动执行这种裁剪优化。

  分区参数为:hive.optimize.pruner=true(默认值为真)   

join

在编写带有 join 操作的代码语句时,应该将条目少的表/子查询放在 Join 操作符的左边。 因为在 Reduce 阶段,位于 Join 操作符左边的表的内容会被加载进内存,载入条目较少的表 可以有效减少 OOM(out of memory)即内存溢出。所以对于同一个 key 来说,对应的 value 值小的放前,大的放后,这便是“小表放前”原则。 若一条语句中有多个 Join,依据 Join 的条件相同与否,有不同的处理方法。

map join

Join 操作在 Map 阶段完成,不再需要Reduce,前提条件是需要的数据在 Map 的过程中可以访问到。

相关的参数为:

  • hive.join.emit.interval = 1000
  • hive.mapjoin.size.key = 10000
  • hive.mapjoin.cache.numrows = 10000

group by

进行GROUP BY操作时需要注意一下几点:

  • Map端部分聚合
      事实上并不是所有的聚合操作都需要在reduce部分进行,很多聚合操作都可以先在Map端进行部分聚合,然后reduce端得出最终结果。

    这里需要修改的参数为:

  hive.map.aggr=true(用于设定是否在 map 端进行聚合,默认值为真) hive.groupby.mapaggr.checkinterval=100000(用于设定 map 端进行聚合操作的条目数)

  • 有数据倾斜时进行负载均衡 

  此处需要设定 hive.groupby.skewindata,当选项设定为 true 是,生成的查询计划有两 个 MapReduce 任务。在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中, 每个 reduce 做部分聚合操作,并输出结果。这样处理的结果是,相同的 Group By Key 有可 能分发到不同的 reduce 中,从而达到负载均衡的目的;第二个 MapReduce 任务再根据预处 理的数据结果按照 Group By Key 分布到 reduce 中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce 中),最后完成最终的聚合操作。   

合并小文件

我们知道文件数目小,容易在文件存储端造成瓶颈,给 HDFS 带来压力,影响处理效率。对此,可以通过合并Map和Reduce的结果文件来消除这样的影响。

  用于设置合并属性的参数有:

  • 是否合并Map输出文件:hive.merge.mapfiles=true(默认值为真)
  • 是否合并Reduce 端输出文件:hive.merge.mapredfiles=false(默认值为假)
  • 合并文件的大小:hive.merge.size.per.task=25610001000(默认值为 256000000)

改进sql

无效ID在关联时的数据倾斜问题

问题:日志中常会出现信息丢失,比如每日约为 20 亿的全网日志,其中的 user_id 为主 键,在日志收集过程中会丢失,出现主键为 null 的情况,如果取其中的 user_id 和 bmw_users 关联,就会碰到数据倾斜的问题。原因是 Hive 中,主键为 null 值的项会被当做相同的 Key 而分配进同一个计算 Map。

解决方法 1:user_id 为空的不参与关联,子查询过滤 null

1
2
3
SELECT * FROM log a
JOIN bmw_users b ON a.user_id IS NOT NULL AND a.user_id=b.user_id
UNION All SELECT * FROM log a WHERE a.user_id IS NULL

解决方法 2 如下所示:函数过滤 null

1
2
3
SELECT * FROM log a LEFT OUTER
JOIN bmw_users b ON
CASE WHEN a.user_id IS NULL THEN CONCAT(‘dp_hive’,RAND()) ELSE a.user_id END =b.user_id;

调优结果:原先由于数据倾斜导致运行时长超过 1 小时,解决方法 1 运行每日平均时长 25 分钟,解决方法 2 运行的每日平均时长在 20 分钟左右。优化效果很明显。

  我们在工作中总结出:解决方法2比解决方法1效果更好,不但IO少了,而且作业数也少了。解决方法1中log读取两次,job 数为2。解决方法2中 job 数是1。这个优化适合无效 id(比如-99、 ‘’,null 等)产生的倾斜问题。把空值的 key 变成一个字符串加上随机数,就能把倾斜的 数据分到不同的Reduce上,从而解决数据倾斜问题。因为空值不参与关联,即使分到不同 的 Reduce 上,也不会影响最终的结果。附上 Hadoop 通用关联的实现方法是:关联通过二次排序实现的,关联的列为 partion key,关联的列和表的 tag 组成排序的 group key,根据 pariton key分配Reduce。同一Reduce内根据group key排序。   

不同数据类型关联产生的倾斜问题

问题:不同数据类型 id 的关联会产生数据倾斜问题。

  一张表 s8 的日志,每个商品一条记录,要和商品表关联。但关联却碰到倾斜的问题。 s8 的日志中有 32 为字符串商品 id,也有数值商品 id,日志中类型是 string 的,但商品中的 数值 id 是 bigint 的。猜想问题的原因是把 s8 的商品 id 转成数值 id 做 hash 来分配 Reduce, 所以字符串 id 的 s8 日志,都到一个 Reduce 上了,解决的方法验证了这个猜测。

解决方法:把数据类型转换成字符串类型

1
2
SELECT * FROM s8_log a LEFT OUTER
JOIN r_auction_auctions b ON a.auction_id=CASE(b.auction_id AS STRING)

利用Hive对UNION ALL优化的特性

多表 union all 会优化成一个 job。

问题:比如推广效果表要和商品表关联,效果表中的 auction_id 列既有 32 为字符串商 品 id,也有数字 id,和商品表关联得到商品的信息。

解决方法:

1
2
3
4
5
6
SELECT * FROM effect a
JOIN
(SELECT auction_id AS auction_id FROM auctions
UNION All
SELECT auction_string_id AS auction_id FROM auctions) b
ON a.auction_id=b.auction_id

比分别过滤数字 id,字符串 id 然后分别和商品表关联性能要好。

  这样写的好处:1 个 MapReduce 作业,商品表只读一次,推广效果表只读取一次。把 这个 SQL 换成 Map/Reduce 代码的话,Map 的时候,把 a 表的记录打上标签 a,商品表记录 每读取一条,打上标签 b,变成两个对,<(b,数字 id),value>,<(b,字符串 id),value>。

  所以商品表的 HDFS 读取只会是一次。

解决Hive对UNION ALL优化的短板

Hive 对 union all 的优化的特性:对 union all 优化只局限于非嵌套查询。

  • 消灭子查询内的 group by
示例 1:子查询内有 group by
1
2
3
SELECT * FROM
(SELECT * FROM t1 GROUP BY c1,c2,c3 UNION ALL SELECT * FROM t2 GROUP BY c1,c2,c3)t3
GROUP BY c1,c2,c3

  从业务逻辑上说,子查询内的 GROUP BY 怎么都看显得多余(功能上的多余,除非有 COUNT(DISTINCT)),如果不是因为 Hive Bug 或者性能上的考量(曾经出现如果不执行子查询 GROUP BY,数据得不到正确的结果的 Hive Bug)。所以这个 Hive 按经验转换成如下所示:

1
SELECT * FROM (SELECT * FROM t1 UNION ALL SELECT * FROM t2)t3 GROUP BY c1,c2,c3

  调优结果:经过测试,并未出现 union all 的 Hive Bug,数据是一致的。MapReduce 的 作业数由 3 减少到 1。

t1 相当于一个目录,t2 相当于一个目录,对 Map/Reduce 程序来说,t1,t2 可以作为 Map/Reduce 作业的 mutli inputs。这可以通过一个 Map/Reduce 来解决这个问题。Hadoop 的 计算框架,不怕数据多,就怕作业数多。

  但如果换成是其他计算平台如 Oracle,那就不一定了,因为把大的输入拆成两个输入, 分别排序汇总后 merge(假如两个子排序是并行的话),是有可能性能更优的(比如希尔排 序比冒泡排序的性能更优)。

  • 消灭子查询内的 COUNT(DISTINCT),MAX,MIN。
1
2
3
4
SELECT * FROM
(SELECT * FROM t1
UNION ALL SELECT c1,c2,c3 COUNT(DISTINCT c4) FROM t2 GROUP BY c1,c2,c3) t3
GROUP BY c1,c2,c3;

  由于子查询里头有 COUNT(DISTINCT)操作,直接去 GROUP BY 将达不到业务目标。这时采用 临时表消灭 COUNT(DISTINCT)作业不但能解决倾斜问题,还能有效减少 jobs。

1
2
3
4
5
6
INSERT t4 SELECT c1,c2,c3,c4 FROM t2 GROUP BY c1,c2,c3;
SELECT c1,c2,c3,SUM(income),SUM(uv) FROM
(SELECT c1,c2,c3,income,0 AS uv FROM t1
UNION ALL
SELECT c1,c2,c3,0 AS income,1 AS uv FROM t2) t3
GROUP BY c1,c2,c3;

  
 job 数是 2,减少一半,而且两次 Map/Reduce 比 COUNT(DISTINCT)效率更高。

调优结果:千万级别的类目表,member 表,与 10 亿级得商品表关联。原先 1963s 的任务经过调整,1152s 即完成。

  • 消灭子查询内的 JOIN
1
2
3
SELECT * FROM
(SELECT * FROM t1 UNION ALL SELECT * FROM t4 UNION ALL SELECT * FROM t2 JOIN t3 ON t2.id=t3.id) x
GROUP BY c1,c2;

上面代码运行会有 5 个 jobs。加入先 JOIN 生存临时表的话 t5,然后 UNION ALL,会变成 2 个 jobs。  

1
2
3
INSERT OVERWRITE TABLE t5
SELECT * FROM t2 JOIN t3 ON t2.id=t3.id;
SELECT * FROM (t1 UNION ALL t4 UNION ALL t5);

  调优结果显示:针对千万级别的广告位表,由原先 5 个 Job 共 15 分钟,分解为 2 个 job 一个 8-10 分钟,一个3分钟。

GROUP BY替代COUNT(DISTINCT)达到优化效果

计算 uv 的时候,经常会用到 COUNT(DISTINCT),但在数据比较倾斜的时候 COUNT(DISTINCT) 会比较慢。这时可以尝试用 GROUP BY 改写代码计算 uv。

原有代码

1
2
INSERT OVERWRITE TABLE s_dw_tanx_adzone_uv PARTITION (ds=20120329)
SELECT 20120329 AS thedate,adzoneid,COUNT(DISTINCT acookie) AS uv FROM s_ods_log_tanx_pv t WHERE t.ds=20120329 GROUP BY adzoneid

  关于COUNT(DISTINCT)的数据倾斜问题不能一概而论,要依情况而定,下面是我测试的一组数据:测试数据:169857条

1
2
3
4
5
6
#统计每日IP
CREATE TABLE ip_2014_12_29 AS SELECT COUNT(DISTINCT ip) AS IP FROM logdfs WHERE logdate='2014_12_29';
耗时:24.805 seconds
#统计每日IP(改造)
CREATE TABLE ip_2014_12_29 AS SELECT COUNT(1) AS IP FROM (SELECT DISTINCT ip from logdfs WHERE logdate='2014_12_29') tmp;
耗时:46.833 seconds

  测试结果表名:明显改造后的语句比之前耗时,这是因为改造后的语句有2个SELECT,多了一个job,这样在数据量小的时候,数据不会存在倾斜问题。

优化总结

  • hive.exec.reducers.bytes.per.reducer #这个参数控制一个job会有多少个reducer来处理,依据的是输入文件的总大小。默认1GB。
  • hive.exec.reducers.max #这个参数控制最大的reducer的数量, 如果 input / bytes per reduce > max 则会启动这个参数所指定的reduce个数。 这个并不会影响mapre.reduce.tasks参数的设置。默认的max是999。
  • mapred.reduce.tasks #这个参数如果指定了,hive就不会用它的estimation函数来自动计算reduce的个数,而是用这个参数来启动reducer。默认是-1。

如果reduce太少:如果数据量很大,会导致这个reduce异常的慢,从而导致这个任务不能结束,也有可能会OOM 2、如果reduce太多: 产生的小文件太多,合并起来代价太高,namenode的内存占用也会增大。如果我们不指定mapred.reduce.tasks, hive会自动计算需要多少个reducer。