阅读 465

Hive 调优总结,让 Hive 调优想法不再碎片化

通过阅读比较多的Hive调优材料,并根据自己的实践,总结Hive 调优如下,让Hive调优想法不再凌乱、碎片化,而是形成结构。

@[toc]

部分参考链接说明

本文参考的部分链接如下:

参考链接1www.cnblogs.com/smartloli/p…

参考链接2blog.csdn.net/mrlevo520/a…

这个链接基于上面的链接做了自己的实践经验总结,纠正了上面那篇文章中一些因为版本太老导致的参数不一致的问题。

参考链接3blog.csdn.net/qq_35036995…

关于 group by 和 join 更详细的解释说明,有详细的例子和图片说明。

参考链接4: blog.csdn.net/hellojoy/ar…

关于 join 更多的调优参数 skew join

参考链接5: weidongzhou.wordpress.com/2017/06/08/…

一篇专门关于 skewjoin 的解释

参考链接6: www.cnblogs.com/xd502djj/p/…

优化总结,大致看看就行,大部分的内容在本文档中能看到。

根据 MapReduce 运行全流程,对每个环节进行调优

MapReduce运行流程图

参考链接:www.cnblogs.com/zxbdboke/p/… (非常好的博客文章,了解 mapReduce 的文件处理全流程)

在这里插入图片描述

MapReduce运行流程关键环节及相关参数

  • 文件输入:对文件进行切片,可设置切片大小,可设置是否合并小文件

  • Map:Map 数量 = 输入文件切片数量

  • Map 文件输出:是否合并设置,合并为多大,什么情况下会合并

  • Reduce: hive 自动计算 reduce 个数 或者 显式指定 reduce 个数

  • Reduce 文件输出:是否合并,合并为多大,什么情况下合并

  • 最终文件压缩:job 之间 文件输出是否压缩,HiveSql 执行完毕的最终结果是否合并。

文件输入阶段

切片大小设置

mapreduce.input.fileinputformat.split.minsize=1 默认值为1
mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默认值Long.MAXValue因此,默认情况下,切片大小=blocksize 
复制代码
  • Split 切片:是MapReduce的最小计算单元,计算公式:computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))。因此默认与 HDFS 的 block 保持一致。

  • maxsize(切片最大值): 参数如果调到比blocksize小,则会让切片变小,而且就等于配置的这个参数的值。

  • minsize(切片最小值): 参数调的比blockSize大,则可以让切片变得比blocksize还大。

  • 注意,==MapReduce 的切片是基于文件进行切片,不是切分数据集整体,也不是切分 block==。

示例:

--设置maxsize大小为10M,也就是说一个block的大小为10M
set mapreduce.input.fileinputformat.split.maxsize=10485760;
复制代码

小文件合并

(Hive 默认就是合并的)

注意,==MapReduce 的切片是基于文件进行切片,不是切分数据集整体,也不是切分 block==。所以,如果有好多小文件,且不开启合并小文件的功能,一个小文件就会对应一个 MapTask,这是非常不划算的。(具体参考本文档上面 MapReduce 框架原理。非常好的博文

# hive 的默认值就是这个,不用特别设置
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
复制代码

Map

Map个数:输入文件切片的个数就是 Map 个数,跟 输入文件集合的 文件个数、文件大小有关系。

如何控制:通过设置 split(切片) 大小、合并小文件,控制 Map 的个数。

Map 不是越多越好:大量小文件,如果每个小文件一个 Map,是集群资源的浪费。

Map 也不是越少越好:如果一个 127 M 文件,但是只有 2 个字段,一共有几百万行、几千万行数据,且数据处理逻辑还挺复杂,那么一个 Map 明显就有点少了。

Map 输出文件

合并 map 阶段和 reduce 阶段输出的小文件

我们知道文件数目小,容易在文件存储端造成瓶颈,给 HDFS 带来压力,影响处理效率。对此,可以通过合并 Map 和 Reduce 的结果文件来消除这样的影响。

用于设置合并属性的参数有:

# 是否合并Map输出文件,默认值为真
hive.merge.mapfiles=true

# 是否合并Reduce 端输出文件,默认值为假:
hive.merge.mapredfiles=false

# 合并文件的大小,默认值为 256000000:
hive.merge.size.per.task=256*1000*1000()
复制代码

更多参数:

作用:当 map 端 或者 reduce 端 输出的平均文件大小小于我们设定的这个值时,就开启合并,将文件合并成一个大文件。
如果 map 端文件合并开关开启了,就合并,否则不合并
如果 reduce 端文件合并开关开启了,就合并,否则不合并
  <property>
    <name>hive.merge.smallfiles.avgsize</name>
    <value>16000000</value>
    <description>
      When the average output file size of a job is less than this number, Hive will start an additional
      map-reduce job to merge the output files into bigger files. This is only done for map-only jobs if hive.merge.mapfiles is true, and for map-reduce jobs if hive.merge.mapredfiles is true.
    </description>
  </property>
复制代码

Reduce

根据配置自动计算

1)每个Reduce处理的数据量默认是256MB

  • 当数据量特别大,并且集群资源足够的时候,可以将默认值适当调小,让多个 reduce task 并行处理,但是要注意,如果 有数据倾斜,或者数据只会发送到两个 reduce 上面,开启再多的reduce也没有用,具体情况参考下面的 join 优化、group by 优化。reduce个数也不是越多越好,如果一个reduce 处理的数据量太小,那么开启、释放 reduce task 的时间就会占用很大一部分,得不偿失;

  • 当数据量特别大,但是集群资源并没有那么多的时候,reduce 的个数设置的太多,甚至超过了集群总体线程数,那么部分task 就会阻塞等待先申请到资源的task 运行完毕才能申请到资源,这样的话,任务不仅不会更快执行完,反而会不停地开启、释放 reduce task ,也就是不停地开启、释放 JVM,造成额外的开销,这个时候为了避免这种情况,反而要把默认值适当调大,让一个 reduce 一次性处理比较大的数据集。

set hive.exec.reducers.bytes.per.reducer=256000000;
复制代码
  1. 每个任务最大的reduce数,默认为1009
set hive.exec.reducers.max=1009;
复制代码
  1. 计算reducer数的公式
N=min(参数2,总输入数据量/参数1)
复制代码

直接指定

# 默认值是 -1 ,就是说 默认是系统自动计算 reduce 个数
hive> set mapreduce.job.reduces;
mapreduce.job.reduces=-1

# 设置每一个job中reduce个数
set mapreduce.job.reduces=3;
复制代码

注意

如果reduce太少:如果数据量很大,会导致这个reduce异常的慢,从而导致这个任务不能结束,也有可能会OOM。

如果reduce太多: 过多启动和初始化,也会消耗时间和资源。且产生的小文件太多,合并起来代价太高,namenode的内存占用也会增大。

Reduce 输出文件

见 Map 输出文件,道理是一样的

输出文件压缩

注意:这里的压缩,不包含 Map 输出文件的压缩,只包含 reduce 输出文件的压缩,有如下两种情况:

  1. 整个 HiveSql 任务执行完了,最终的结果写入到 HDFS 上的时候,要不要压缩
  2. 某个 HiveSql 可能有多个 mapreduce 任务串行,那么 每一个 mapreduce 任务完成后,这个任务输出的 临时中间结果文件 要不要压缩。

Hive表中间数据压缩。

#设置为true为激活中间数据压缩功能,默认是false,没有开启。
# This controls whether intermediate files produced by Hive between multiple map-reduce jobs are compressed.
set hive.exec.compress.intermediate=true;

# 设置中间数据的压缩算法。
# 这个参数有问题吧?这难道不是 map 输出的压缩格式吗?
set mapred.map.output.compression.codec= org.apache.hadoop.io.compress.SnappyCodec;

复制代码

Hive表最终输出结果压缩

# This controls whether the final outputs of a query (to a local/HDFS file or a Hive table) is compressed.
set hive.exec.compress.output=true;
set mapred.output.compression.codec= org.apache.hadoop.io.compress.SnappyCodec;
复制代码

Hive 配置文件中的说明:

<property>
    <name>hive.exec.compress.output</name>
    <value>false</value>
    <description>
      This controls whether the final outputs of a query (to a local/HDFS file or a Hive table) is compressed.
      The compression codec and other options are determined from Hadoop config variables mapred.output.compress*
    </description>
  </property>

  <property>
    <name>hive.exec.compress.intermediate</name>
    <value>false</value>
      
    <description>
      This controls whether intermediate files produced by Hive between multiple map-reduce jobs are compressed.
      The compression codec and other options are determined from Hadoop config variables mapred.output.compress*
    </description>
      
  </property>
复制代码

join 操作调优

a. Join 原则

  • shuffle 之后,在 reduce 阶段,左边的表先加载进内存,所以把小表放在左边不容易 OOM
  • 多表 join, 如果 a.id = b.id, b.id=d.id,join 的字段是一致的,只开启一个 Job 任务
  • 如果不一致,有几个 join,就是几个 Job 任务

b. MapJoin

/*+ MAPJOIN(pv) / 用法是老版本的,现在 hive 是自动进行 mapjoin 的,无需通过 /+ MAPJOIN(pv) */ 告诉解释器 开启 mapjoin 。

--默认为true
set hive.auto.convert.join = true;

--大表小表的阈值设置(默认25M以下认为是小表)
set hive.mapjoin.smalltable.filesize=25000000;
复制代码

c. SkewJoin

参考链接4: blog.csdn.net/hellojoy/ar…

关于 join 更多的调优参数

参考链接5: weidongzhou.wordpress.com/2017/06/08/…

skewjoin 原理:

如果某个 Key 产生了数据倾斜,则 这个 key 暂时不进行运算,存入 HDFS,马上开启另外一个 Job 专门处理这个 key,用 map Join 的方式进行。

在这里插入图片描述

  • hive.optimize.skewjoin.compiletime

    如果建表语句元数据中指定了skew key,则使用set hive.optimize.skewjoin.compiletime=true开启skew join。

    可以通过如下建表语句指定SKEWED key:

    CREATE TABLE list_bucket_single (key STRING, value STRING)
    SKEWED BY (key) ON (1,5,6) [STORED AS DIRECTORIES];
    复制代码
  • hive.optimize.skewjoin

    该参数为在运行时动态指定数据进行skewjoin,一般和hive.skewjoin.key参数一起使用

    set hive.optimize.skewjoin=true;
    set hive.skewjoin.key=100000;
    复制代码

    以上参数表示当记录条数超过100000时采用skewjoin操作

  • 区别

    hive.optimize.skewjoin.compiletime和hive.optimize.skewjoin区别为前者为编译时参数,后者为运行时参数。前者在生成执行计划时根据元数据生成skewjoin,此参数要求倾斜值一定;后者为运行过程中根据数据条数进行skewjoin优化。hive.optimize.skewjoin实际上应该重名为为hive.optimize.skewjoin.runtime参数,考虑兼容性没有进行重命名

Group By 操作调优

group by:聚合

  • sum,count,max,min等UDAF,不怕数据倾斜问题,hadoop在map端的汇总合并优化,使数据倾斜不成问题。
  • count(distinct ),在数据量大的情况下,效率较低,如果是多count(distinct )效率更低,因为count(distinct)是按group by 字段分组,按distinct字段排序,一般这种分布方式是很倾斜的。举个例子:比如男uv,女uv,像淘宝一天30亿的pv,如果按性别分组,分配2个reduce,每个reduce处理15亿数据。

关于 Group BY,我的总结就是两句话:

  1. 能 预聚合 的,就进行 预聚合。如:sum, count(1), max(), min()。(avg不能进行预聚合)
  2. 不能进行预聚合的,在必要的情况下就开启负载均衡,如:count(distinct)。注意:也不是所有情况下都需要开启这个。注意要满足两个条件。

a Map 端部分聚合(预聚合)

// 用于设定是否在 map 端进行聚合,默认值为真
hive.map.aggr=true 

// 用于设定 map 端进行聚合操作的条目数
hive.groupby.mapaggr.checkinterval=100000

// 如果 map 端的聚合率 大于 50%,就自动关闭预聚合功能,这个是在 参考链接3 中有说明,hive 配置文件中有这个配置项
Hive.map.aggr.hash.min.reduction=0.5  
复制代码

hive.groupby.mapaggr.checkinterval:map端做聚合时,group by 的key所允许的数据行数,超过该值则进行分拆,默认是100000;(参考:blog.csdn.net/u010003835/…

b 开启负载均衡

考虑下面情况

select gender, count(distinct id) from user group by gender
复制代码

这种情况下: gender 是非散列的,只有两个值,因此也就只有两个 reducer; 而 map 却需要把所有的 id 发送到 reduce 端,这个没办法提前进行预聚合。(为什么?要想清楚。以为这是去重操作,某个 id 可能在多个 map 端都由,在 map 端去重,会造成去重不彻底,重复计数。) 这样就会导致两个 reduce 服务要处理的数据量实在是太大了,所以需要开启负载均衡

负载均衡原理

将 sql 变成 两个 MR 程序。

第一个 MR 的 reduce_key 是 gender + id,这样就是散列的,就会开启很多的 reduce,不会存在数据倾斜的情况。在 reduce 端,就进行第一个 reduce 聚合,分别在自己的服务器上计数。

注意:这里要想清楚,为什么第一次 reduce 不会影响最终的结果,不会出现上面说的去重不彻底的问题?因为 gender + id 如果是一样的,肯定发送到了 同一个 reduce 上面,就不会有问题。

注明:女_id1 是 key,1 是 单词计数的那个计数

在这里插入图片描述

第二个 MR 做最终的聚合。

虽然这个聚合还是只有两个 reduce,但是每个 map 端的数据,只有两条了。

在这里插入图片描述

参数

# 这个值默认是 false
set hive.groupby.skewindata = true;
复制代码

当选项设定为 true 是,生成的查询计划有两 个 MapReduce 任务。在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中, 每个 reduce 做部分聚合操作,并输出结果。这样处理的结果是,相同的 Group By Key 有可 能分发到不同的 reduce 中,从而达到负载均衡的目的;第二个 MapReduce 任务再根据预处 理的数据结果按照 Group By Key 分布到 reduce 中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce 中),最后完成最终的聚合操作。

看情况开启负载均衡

如下面的例子

select id, count(gender) from user group by id
复制代码

很明显,reduce_key 此时 为 id, id 本身就是散列的,所以数据本身就很平衡,不用开启。

==负载均衡适用情况==

  1. groupby_key,也就是 reduce_key,不散列,如 gender
  2. distinct_key, 非常散列

其它的一些情况,mapJoin 已经足够解决问题了。

SQL 语句优化

严格模式

Hive提供了一个严格模式,可以防止用户执行那些可能意想不到的不好的影响的查询。

通过设置属性hive.mapred.mode值为默认是非严格模式nonstrict 。开启严格模式需要修改hive.mapred.mode值为strict,开启严格模式可以禁止3种类型的查询。

--设置非严格模式(默认)
set hive.mapred.mode=nonstrict;

--设置严格模式
set hive.mapred.mode=strict;
复制代码

(1)对于分区表,除非where语句中含有分区字段过滤条件来限制范围,否则不允许执行

--设置严格模式下 执行sql语句报错; 非严格模式下是可以的
select * from order_partition;

异常信息:Error: Error while compiling statement: FAILED: SemanticException [Error 10041]: No partition predicate found for Alias "order_partition" Table "order_partition" 
复制代码

(2)对于使用了order by语句的查询,要求必须使用limit语句

--设置严格模式下 执行sql语句报错; 非严格模式下是可以的
select * from order_partition where month='2019-03' order by order_price; 

异常信息:Error: Error while compiling statement: FAILED: SemanticException 1:61 In strict mode, if ORDER BY is specified, LIMIT must also be specified. Error encountered near token 'order_price'
复制代码

(3)限制笛卡尔积的查询

严格模式下,避免出现笛卡尔积的查询

列裁剪

博客文章中写了有参数,默认就是开启的。

我在hive 的配置文件中没有搜到那个参数,新版本应该是自动进行的。

分区裁剪

同上。

避免使用 笛卡尔积

join 不加 on 条件

GROUP BY 替代 COUNT(DISTINCT) 达到优化效果

计算 uv 的时候,经常会用到 COUNT(DISTINCT),但在数据比较倾斜的时候 COUNT(DISTINCT) 会比较慢。这时可以尝试用 GROUP BY 改写代码计算 uv。

数据量小的时候无所谓,数据量大的情况下,由于count distinct 操作只能用一个reduce Task来完成,这一个Reduce需要处理的数据量太大,就会导致整个Job很难完成,一般count distinct使用先group by 再count的方式替换。

 --每个reduce任务处理的数据量 默认256000000(256M)
 set hive.exec.reducers.bytes.per.reducer=32123456;
 
 -- 很明显会将所有的 IP 发送到同一个 reduce 上面去,所以存在问题。
 select  count(distinct ip )  from log_text;
 
 转换成
 select count(ip) from (select ip from log_text group by ip) t;
 
 虽然会多用一个Job来完成,但在数据量大的情况下,这个绝对是值得的。
复制代码

无效 key 问题

方法1:过滤 + Union

第一种方法:先过滤,再 union,也可以不用 union,随意

SELECT * FROM log a 
JOIN bmw_users b 
ON a.user_id IS NOT NULL AND a.user_id=b.user_id 
UNION All SELECT * FROM log a WHERE a.user_id IS NULL
复制代码

方法2:null 转为 随机字符串

第二种方法:将空值变成 字符串 + 随机数字,就会将其分散到不同的 reduce上面。因为空值不参与关联,即使分到不同 的 Reduce 上,也不会影响最终的结果

SELECT * FROM log a LEFT OUTER 
JOIN bmw_users b ON 
CASE WHEN a.user_id IS NULL THEN CONCAT(‘dp_hive’,RAND()) ELSE a.user_id END =b.user_id;
复制代码

实践证明:方案二 比 方案一 效果更好一些。因为只有 1 个 job, IO 也少了。

不同数据类型关联产生的倾斜问题

略过。参考链接2

使用 union All

Hive 多表 union all 会优化成一个 job。要充分利用 这个 操作。

参考链接1 中的例子。

Union All 不能优化 嵌套 join count 的如何办

略过。

集群运行角度调优

fetch 抓取

Fetch抓取是指,==Hive中对某些情况的查询可以不必使用MapReduce计算==

例如:select * from employee;

在这种情况下,Hive可以简单地读取employee对应的存储目录下的文件,然后输出查询结果到控制台

在hive-default.xml.template文件中 ==hive.fetch.task.conversion默认是more==,老版本hive默认是minimal,该属性修改为more以后,在全局查找、字段查找、limit查找等都不走mapreduce。

本地模式

在Hive客户端测试时,默认情况下是启用hadoop的job模式,把任务提交到集群中运行,这样会导致计算非常缓慢;

Hive可以通过本地模式在单台机器上处理任务。对于小数据集,执行时间可以明显被缩短。

案例实操

--开启本地模式,并执行查询语句
set hive.exec.mode.local.auto=true;  //开启本地mr
  
--设置local mr的最大输入数据量,当输入数据量小于这个值时采用local  mr的方式,
--默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
  
--设置local mr的最大输入文件个数,当输入文件个数小于这个值时采用local mr的方式,
--默认为4
set hive.exec.mode.local.auto.input.files.max=5;

  
--执行查询的sql语句
select * from employee cluster by deptid;
复制代码

决定是否执行本地模式的条件,都需要满足:

  1. 总数据量小于给定值,默认 128 M。
  2. 文件数 小于给定数量,默认 4 。

并行执行

把一个sql语句中没有相互依赖的阶段并行去运行。提高集群资源利用率

--开启并行执行
set hive.exec.parallel=true;
--同一个sql允许最大并行度,默认为8。
set hive.exec.parallel.thread.number=16;
复制代码

具体解释: (参考链接

hive.exec.parallel参数控制在同一个sql中的不同的job是否可以同时运行,默认为false.

下面是对于该参数的测试过程:

测试sql:

select r1.a
from (
   select t.a from sunwg_10 t join sunwg_10000000 s on t.a=s.b) r1 
   join 
   (select s.b from sunwg_100000 t join sunwg_10 s on t.a=s.b) r2 
   on (r1.a=r2.b);
复制代码

1 当参数为false的时候,三个job是顺序的执行

set hive.exec.parallel=false;
复制代码

2 但是可以看出来其实两个子查询中的sql并无关系, 可以并行的跑。

set hive.exec.parallel=true;
复制代码

总结: 在资源充足的时候hive.exec.parallel会让那些存在并发job的sql运行得更快,但同时消耗更多的资源 可以评估下hive.exec.parallel对我们的刷新任务是否有帮助.

JVM 重用

  • JVM重用是Hadoop调优参数的内容,其对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或task特别多的场景,这类场景大多数执行时间都很短。

  • JVM重用可以使得JVM实例在同一个job中重新使用N次。减少进程的启动和销毁时间。

    -- 设置jvm重用个数
    set mapred.job.reuse.jvm.num.tasks=5;
    复制代码

推测执行

  • Hadoop采用了推测执行(Speculative Execution)机制,它根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。
--开启推测执行机制
set hive.mapred.reduce.tasks.speculative.execution=true;
复制代码