一文带你理清Spark Core调优的方方面面

3,701 阅读43分钟

前言

本文的注意事项

  1. 观看本文前,可以先百度搜索一下Spark程序的十大开发原则看看哦
  2. 文章虽然很长,可并不是什么枯燥乏味的内容,而且都是面试时的干货(我觉得🤣)可以结合PC端的目录食用,可以直接跳转到你想要的那部分内容
  3. 图非常的重要,是文章中最有价值的部分。如果不是很重要的图一般不会亲手画
  4. 此文会很大程度上借鉴美团的文章分享内容和Spark官方资料去进行说明,也会结合笔者自身的理解。
  5. 数据倾斜部分和Spark Streaming调优息息相关

一、简述Spark的十大开发原则

这里会直接一笔带过,不作详细的展开了,大家可以通过搜索引擎能找到它们的详细说明。我们用最直接的话来阐述

1.1 避免创建重复的RDD

就如字面上的意思,对于同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据。避免我们的Spark作业会进行多次重复计算来创建多个代表相同数据的RDD,进而增加了作业的性能开销。

1.2 尽可能复用同一个RDD

对于类似多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。因为Spark中的RDD如果不缓存下来每次它都会从源头处开始重新计算一遍

比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的。那么此时我们可以只使用key-value类型的那个RDD,因为其中已经包含了另一个的数据。

1.3 对多次使用的RDD进行持久化

对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作。从而保证对一个RDD执行多次算子操作时,这个RDD本身仅仅被计算一次。

补充:Spark的持久化级别

这些英文其实都简单到能直接看出来是啥意思了···需要多留意的词或者句子我会用粗体打上

持久化级别 含义解释
MEMORY_ONLY 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。
MEMORY_AND_DISK 使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
MEMORY_ONLY_SER 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
MEMORY_AND_DISK_SER 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
DISK_ONLY 使用未序列化的Java对象格式,将数据全部写入磁盘文件中
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等. 对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。

(补充)如何选择一种最合适的持久化策略

默认情况下,性能最高的当然是MEMORY_ONLY,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。

但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。

如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。

这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。

如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略。如果采用这个举措,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

通常不建议使用DISK_ONLY和后缀为2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。

1.4 尽量避免使用shuffle类算子

因为Spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle过程简单来说,就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。

shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。

1.5 使用map-side预聚合的shuffle操作

所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。

最鲜明的例子其实就是reduceByKey和groupByKey,因为groupByKey是不会预聚合的

groupByKey:

img
img

reduceByKey:

img
img

我们可以看到reduceByKey在shuffle前先对key相同的进行了聚合

1.6 使用高性能的算子

这块可以自行了解,美团文章中给出的例子有下面5个

使用reduceByKey/aggregateByKey替代groupByKey
使用mapPartitions替代普通map
使用foreachPartitions替代foreach
使用filter之后进行coalesce操作
使用repartitionAndSortWithinPartitions替代repartition与sort类操作

1.7 广播大变量

在使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。

因此对于上述情况,如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。

简单一句话说明就是把原本每个task里面都得整一个变量的,不过现在就在Executor中存一份,然后task需要的时候就过来拿就可以了

1.8 使用Kryo优化序列化性能

在Spark中,主要有三个地方涉及到了序列化:

  1. 在使用到外部变量时,该变量会被序列化后进行网络传输
  2. 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
  3. 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能,性能高10倍左右。

1.9 优化数据结构

Java中,有三种类型比较耗费内存:

  1. 对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
  2. 字符串,每个字符串内部都有一个字符数组以及长度等额外信息。
  3. 集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。

Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。

  1. 能用json字符串的不要用对象表示,因为对象头额外占16个字节

  2. 能不用字符串就不用用字符串,因为字符串额外占40个字节,比如,能用1 就不要用”1”

  3. 尽量用属组代替集合类型

  4. 当然不要为了性能好而性能好,我们还是要兼顾代码的可读性和开发效率。

1.10 尽可能数据本地化

下文中会说明

二、Spark的运行流程

这和Stage的划分一样基本上说是面试必问的问题,问也很简单,说一个Spark任务然后提交后的整个流程给说说看

2.1 Driver的初始化

首先是Driver的初始化,图中已经把步骤标明清楚了,就不展开说明了(draw.io 出了些问题不知道为啥字体背景颜色默认天蓝色了😓,我也设置不回来,所以就将就一下吧)

2.2 Task的生成及分配

当代码遇到一个action算子的时候,会产生一个job任务,在产生任务之后,DAGScheduler 会进行Stage的划分(涉及宽窄依赖和划分算法)。stage里面会有task任务,同一个Stage里面的task,任务逻辑一样,只是处理的数据不一样而已 。然后Task会被分发到各个Worker中去运行

这里提到的Task的分配算法我这里稍微提一下,其实这个是关于Spark Core调优的十大原则中的最后一点“尽可能数据本地化”所阐述的。

(补充)关于进程本地化级别的描述

进程本地化级别:

  1. PROCESS_LOCAL:进程本地化

代码和数据在同一个进程中,也就是在同一个executor中;计算数据的task由executor执行,数据在executor的BlockManager中;性能最好.

  1. NODE_LOCAL:节点本地化代码和数据在同一个节点中;

比如说,数据作为一个HDFS block块,就在节点上,而task在节点上某个executor中运行;或者是,数据和task在一个节点上的不同executor中;数据需要在进程间进行传输

  1. NO_PREF
    对于task来说,数据从哪里获取都一样,没有好坏之分

  2. RACK_LOCAL:机架本地化
    数据和task在一个机架的两个节点上;数据需要通过网络在节点之间进行传输

  3. ANY
    数据和task可能在集群中的任何地方,而且不在一个机架中,性能最差

我们提交任务后有Spark任务的监控界面,大家一定要利用好这个界面,Spark的界面是做得很好的。比如我们看到这个task的数据本地性是NODE_LOCAL说明是极好的,但是如果有你的task任务的数据本地性较差,可以尝试如下调优:

(补充)如何调优

spark.locality.wait 默认值是3s 这个代表的意思是,task任务分配的时候,先是按照 _PROCESS_LOCAL 的这种方式去分配task的,但是如果 PROCESS_LOCAL 这个不满足,那么默认就等3秒,看能不能按照这级别去分配,但是如果等了3秒也实现不了。那么就按 NODE_LOCAL 这个级别去分配

以此类推,每次都是等三秒。但是我们知道,如果想代码运行速度快,那么就尽可能的让task分配在PROCESS_LOCALNODE_LOCAL 级别,所以调优的时候,就让task在这两种级别的时候多等一会儿,这样尽可能的把任务分配到这两个级别。所以默认3秒就有点少了。

spark.locality.wait.process 30s
spark.locality.wait.node 30s

在这两个级别的时候设置多等一会儿

2.3 回到Executor的步骤说明

在Executor里面会生成一个线程池,这个线程池其实是对应了Driver初始化图中的第4步,早就已经生成好的了

有很多小伙伴可能还真不了解各个名词的关系,这里也一并在图中说一下,就是一个Application里面会有很多Job,Job里面会划分Stage,Stage里面又会有许多Task,然后一个Task对应一个Partition,就这么简单

一、基于Spark内存模型调优

1.1 概述

我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core。

而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的资源管理集群,比如我们公司使用的是YARN作为资源管理集群)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。

1.2 静态内存模型

在2016年 spark 1.6 版本以前 spark的executor使用的静态内存模型,但是在spark1.6开始,多增加了一个统一内存模型。通过spark.memory.useLegacyMode 这个参数去配置。默认这个值是false,带表用的是新的动态内存模型,如果想用以前的静态内存模型,那么就要把这个值改为true。

我们先用一个比官方更为简单的图先说明一下大概,你可以先大致这么去理解

这里就是我们平时提交的—executor-memory的划分,实际上就是把我们的一个executor分成了三部分,一部分是Storage内存区域,一部分是execution区域,还有一部分是其他区域。如果使用的静态内存模型,那么用这几个参数去控制:

spark.storage.memoryFraction:默认0.6
spark.shuffle.memoryFraction:默认0.2  
所以第三部分就是0.2

如果我们cache数据量比较大,或者是我们的广播变量比较大,那我们就把spark.storage.memoryFraction这个值调大一点。但是如果我们代码里面没有广播变量,也没有cache,shuffle又比较多,那我们要把spark.shuffle.memoryFraction 这值调大。

好的然后我们可以上复杂一点的那张图了。其实你会发现它就是比我上方的那个多了预留的部分和一个unroll

静态内存模型的缺点:

我们配置好了Storage内存区域和execution区域后,我们的一个任务假设execution内存不够用了,但是它的Storage内存区域是空闲的,两个之间不能互相借用,不够灵活,所以才出来我们新的统一内存模型。

1.3 统一内存模型


动态内存模型先是预留了300m内存,防止内存溢出。

动态内存模型把整体内存分成了两部分,由spark.memory.fraction这个参数表示 默认值是0.6 这部分又划分成为两个小部分。 这两部分其实就是:Storage内存和execution内存。由spark.memory.storageFraction 这个参数去调配,如果spark.memory.storageFraction这个值配的是0.5,那说明这0.6里面 storage占了0.5,也就是execution占了0.1 。(注意:这里的零点几完全就是相对于总内存来说的,千万不要以为spark.memory.storageFraction是0.5是指占spark.memory.fraction的0.5的意思,不是的,它是占总内存的0.5的意思

统一内存模型有什么特点呢?

Storage内存和execution内存 可以相互借用。不用像静态内存模型那样死板,但是是有规则的:

场景一:Execution使用的时候发现内存不够了,然后就会把storage的内存里的数据驱逐到磁盘上。

场景二:一开始execution的内存使用得不多,但是storage使用的内存多,所以storage就借用了execution的内存,但是后来execution也要需要内存了,这个时候就会把storage的内存里的数据写到磁盘上,腾出内存空间。

细心的小伙伴也会发现,每次都是去折腾storage。
是因为execution里面的数据是马上就要用的,而storage里的数据不一定马上就要用。

1.4 资源调优的部分

了解完了Spark作业运行的基本原理之后,对资源相关的参数就容易理解了。所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能。以下参数就是Spark中主要的资源参数,每个参数都对应着作业运行原理中的某个部分,同时也给出了一个调优的参考值。

1.4.1 num-executors

参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。

参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

我觉得正常来说一开始 num-executors 先按照1/10个节点数量去试水是比较合适的,也就是1000个节点就来100个,100个节点就10个

1.4.2 executor-memory

参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。

参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致其它同事的作业无法运行。

1.4.3 executor-cores

参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。

参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同学的作业运行。

个人觉得1个cpu core对应3个task,这个情况是效果最佳的

1.4.4 driver-memory

参数说明:该参数用于设置Driver进程的内存

参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

1.4.5 spark.default.parallelism

参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。

参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。

试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

1.4.6 spark.storage.memoryFraction

这个东西其实刚刚都已经提到过了🤣。参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。

参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。

但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

1.4.7 spark.shuffle.memoryFraction

这个刚刚也提到过了。参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。

参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

1.4.8 小总结

资源参数的调优,没有一个固定的值,需要根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况),同时参考本篇文章中给出的原理以及调优建议,合理地设置上述参数。

以下是一份spark-submit命令的示例,大家可以参考一下,并根据自己的实际情况进行调节:

./bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \

--master yarn-cluster这个参数需要注意,因为不同的Spark版本这个参数是不一致的。所以在提交任务时记得要注意我们集群中的版本,如果是Spark1.5版本,就是刚刚的--master yarn-cluster,如果是2.4版本,这个参数又变成了--master yarn了,所以一定要注意

如果出现了类似于java.lang.OutOfMemoryError, ExecutorLostFailure, Executor exit code 为143, executor lost, hearbeat time out, shuffle file lost···等等,先别紧张,很有可能就是内存除了问题,可以先尝试增加内存。如果还是解决不了,那么请再把注意力放到数据倾斜方面

二、数据倾斜调优

2.1 概述

有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark作业的性能会比期望差很多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能。

2.1.1 数据倾斜发生时的现象

绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。

原本能够正常执行的Spark作业,某天突然报出OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。这种情况比较少见。

2.1.2 数据倾斜发生的原理

数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。

比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。

因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。

上图就是一个很清晰的例子:hello这个key,在三个节点上对应了总共7条数据,这些数据都会被拉取到同一个task中进行处理;而world和you这两个key分别才对应1条数据,所以另外两个task只要分别处理1条数据即可。此时第一个task的运行时间可能是另外两个task的7倍,而整个stage的运行速度也由运行最慢的那个task所决定。

2.1.3 如何定位数据倾斜的代码

数据倾斜只会发生在shuffle过程中。这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。

某个task执行特别慢的情况
首先要看的,就是数据倾斜发生在第几个stage中
如果是用yarn-client模式提交,那么本地是直接可以看到log的,可以在log中找到当前运行到了第几个stage;如果是用yarn-cluster模式提交,则可以通过Spark Web UI来查看当前运行到了第几个stage。此外,无论是使用yarn-client模式还是yarn-cluster模式,我们都可以在Spark Web UI上深入看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜

我们可以这样来定位发生数据倾斜的算子:Spark Web UI中点进去一个Application里面会有很多job,因为job的界定是action算子来分割的,所以我们就看我们代码中的action算子来判断代码位置,再点进去job会有stage,stage的界定是shuffle类的算子,以这个为依据又能定位一波代码位置,此时只要看到哪个task消耗时间长,那就知道了是哪个stage(shuffle算子)出现了问题


比如上图中,倒数第三列显示了每个task的运行时间。明显可以看到,有的task运行特别快,只需要几秒钟就可以运行完;而有的task运行特别慢,需要几分钟才能运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。此外,倒数第一列显示了每个task处理的数据量,明显可以看到,运行时间特别短的task只需要处理几百KB的数据即可,而运行时间特别长的task需要处理几千KB的数据,处理的数据量差了10倍。此时更加能够确定是发生了数据倾斜。

但是大家要注意的是,不能单纯靠偶然的内存溢出就判定发生了数据倾斜。因为自己编写的代码的bug,以及偶然出现的数据异常,也可能会导致内存溢出。因此还是要按照上面所讲的方法,通过Spark Web UI查看报错的那个stage的各个task的运行时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。

2.1.4 查看导致数据倾斜的key的数据分布情况

知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了shuffle操作并且导致了数据倾斜的RDD/Hive表,查看一下其中key的分布情况。这主要是为之后选择哪一种技术方案提供依据。针对不同的key分布与不同的shuffle算子组合起来的各种情况,可能需要选择不同的技术方案来解决。

此时根据你执行操作的情况不同,可以有很多种查看key分布的方式:

  1. 如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。
  2. 如果是对Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来的各个key出现的次数,collect/take到客户端打印一下,就可以看到key的分布情况。

2.2 数据倾斜的解决方案

2.2.1 使用Hive ETL预处理数据

其实这招纯属是让一些费时间的操作留到凌晨的时候跑,然后第二天需要数据的时候直接拿到凌晨跑出来的结果过来用的方法(骗自己😂)。适用于Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表执行某个分析操作

通过Hive来进行数据预处理(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join。此时由于数据已经预先进行过聚合或join操作了,那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。实现起来简单便捷,效果还非常好

但是这一招也会有它不能用的场景,也就是如果要玩实时的它就不能用了

2.2.2 过滤少数导致倾斜的key

如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。如果需要每次作业执行时,动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。

将导致数据倾斜的key给过滤掉之后,这些key就不会参与计算了,自然不可能产生数据倾斜,刚刚提到的定位算子就立刻派上用场了,你不是数量多嘛,我就直接把你删了(又是一招骗自己的😂)

2.2.3 提高shuffle操作的并行度

增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。

举例来说,如果原本有5个key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。

该方案其实非常扯,其实根本无法解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。

前面3种方案都是些没啥用的玩意,都是逃避了问题的,从第4种开始方案就变得合理起来了。

2.2.4 两阶段聚合(局部聚合+全局聚合)

对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。

核心实现思路就是进行两阶段聚合。

  1. 第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,
    比如 (hello, 1) (hello, 1) (hello, 1) (hello, 1) ,就会变成 (1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)

  2. 接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)

  3. 然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2)

  4. 再次进行全局聚合操作,就可以得到最终结果了,最后是(hello, 4)

这个方法的优点是对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。可是缺点也很明显,它仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。

2.2.5 将reduce join转为map join

在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。

普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。

优点:对join操作导致的数据倾斜,直接避开了shuffle,也就根本不会发生数据倾斜。

方案缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。因为我们需要将小表进行广播,会比较消耗内存资源,driver和每个Executor内存中都会驻留一份小RDD的全量数据。如果我们广播出去的RDD数据比较大,比如10G以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。

2.2.6 (最nice的)采样倾斜key并分拆join操作

两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用2.2.5,那么此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均匀,那么采用这个解决方案是比较合适的。

  1. 对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key
  2. 然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。
  3. 接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据扩容成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。

到这一步我们可以得出这个套路,如果RDDA1或者RDDB1中有一份数据量较小可以满足方案5的条件,那就直接执行方案5

可是其实在实际的生产环境中,就是两个都非常大的情况,所以我们要继续进行改良

  1. 再将附加了随机前缀的独立RDD与另一个扩容n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。
  2. 而另外两个普通的RDD就照常join即可。
  3. 最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。

对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了

PS:不需要觉得扩容3倍觉得这样不好,发生数据倾斜有可能会导致一个任务计算好几天甚至十几天,相比于这种风险,这个扩容付出的代价是值得的

优点:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,采用该方式可以用最有效的方式打散key进行join。而且只需要针对少数倾斜key对应的数据进行扩容n倍,不需要对全量数据进行扩容。避免了占用过多内存。

方案缺点:如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。

2.2.7 使用随机前缀和扩容RDD进行join

如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用这一种方案来解决问题了。简单粗暴,不讲道理😤

  1. 该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。
  2. 然后将该RDD的每条数据都打上一个n以内的随机前缀。
  3. 同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。
  4. 最后将两个处理后的RDD进行join即可。

上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。

把上面的几种数据倾斜的解决方案综合的灵活运行,这样可以保证日常遇到的问题基本都能有思路解决


三、shuffle调优方面

Spark Shuffle在Spark1.3之前发展不太成熟,那时候可能面试提及非常多,现在的面试其实已经不太会去问这一块的知识了,但是如果我们要深入地去了解一门技术,那还是得把这些细枝末节给抠一下,了解一下前世今生。

这里的调优内容主要就是把上一篇Spark的Shuffle总结分析里面提到的参数给再次说明一下

3.1 (提升性能)spark.shuffle.file.buffer

  • 默认值:32k
  • 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数

3.2 (提升性能)spark.reducer.maxSizeInFlight

  • 默认值:48m
  • 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数

3.3 (提高稳定)spark.shuffle.io.maxRetries

  • 默认值:3
  • 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
  • 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

3.4 (提高稳定)spark.shuffle.io.retryWait

  • 默认值:5s
  • 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
  • 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

3.5 (内存模型)spark.shuffle.memoryFraction

  • 默认值:0.2
  • 参数说明:(Spark1.6是这个参数,1.6以后参数变成spark.memory.fraction)该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
  • 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

3.6 spark.shuffle.manager

  • 默认值:sort
  • 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。Spark1.6以后把hash方式给移除了,tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
  • 调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

3.7 spark.shuffle.sort.bypassMergeThreshold

  • 默认值:200
  • 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
  • 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

finally

万字不易,这也是关于Spark core的最后一篇了。希望对大家有所帮助。之后会更Spark Streaming方面的内容,感兴趣的朋友可以关注一下哦,公众号:说出你的愿望吧