Spark学习——性能调优(三)

1,019 阅读5分钟

其他更多java基础文章:
java基础学习(目录)


继续上一篇Spark学习——性能调优(二)的讲解

使用MapPartition提升性能

什么时候比较适合用MapPartitions系列操作?

数据量不是特别大的时候,都可以用这种MapPartitions系列操作,性能还是非常不错的,是有提升的。

在项目中,自己先去估算一下RDD的数据量,以及每个partition的量,还有自己分配给每个executor的内存资源。看看一下子内存容纳所有的partition数据,行不行。如果行,可以试一下,能跑通就好。性能肯定是有提升的。

但是试了一下以后,发现,不行,OOM了,那就放弃吧。

filter过后使用coalesce减少分区数量

默认情况下,经过了这种filter之后,RDD中的每个partition的数据量,可能都不太一样了。(原本每个partition的数据量可能是差不多的) 这可能会导致的问题:

  1. 每个partition数据量变少了,但是在后面进行处理的时候,还是要跟partition数量一样数量的task,来进行处理;有点浪费task计算资源。
  2. 每个partition的数据量不一样,会导致后面的每个task处理每个partition的时候,每个task要处理的数据量就不同,这个时候很容易发生数据倾斜。

针对上述的两个问题,我们希望应该能够怎么样?

  1. 针对第一个问题,我们希望可以进行partition的压缩吧,因为数据量变少了,那么partition其实也完全可以对应的变少。比如原来是4个partition,现在完全可以变成2个partition。那么就只要用后面的2个task来处理即可。就不会造成task计算资源的浪费。(不必要,针对只有一点点数据的partition,还去启动一个task来计算)

  2. 针对第二个问题,其实解决方案跟第一个问题是一样的;也是去压缩partition,尽量让每个partition的数据量差不多。那么这样的话,后面的task分配到的partition的数据量也就差不多。不会造成有的task运行速度特别慢,有的task运行速度特别快。避免了数据倾斜的问题。

coalesce算子
主要就是用于在filter操作之后,针对每个partition的数据量各不相同的情况,来压缩partition的数量。减少partition的数量,而且让每个partition的数据量都尽量均匀紧凑。

从而便于后面的task进行计算操作,在某种程度上,能够一定程度的提升性能。

RDD.filter(XXX).coalesce(100);

使用foreachPartition优化

使用repatition解决Spark SQL低并行度

前说过,并行度是自己可以调节,或者说是设置的。

1、spark.default.parallelism
2、textFile(),传入第二个参数,指定partition数量(比较少用)

官方推荐,根据你的application的总cpu core数量(在spark-submit中可以指定,比如 200个),自己手动设置spark.default.parallelism参数,指定为cpu core总数的2~3倍。400~600个并行度。

你设置的这个并行度,在哪些情况下会生效?哪些情况下,不会生效?

如果你压根儿没有使用Spark SQL(DataFrame),那么你整个spark application默认所有stage的并行度都是你设置的那个参数。(除非你使用coalesce算子缩减过partition数量)

问题来了,如果用Spark SQL,那含有Spark SQL的那个stage的并行度,你没法自己指定。Spark SQL自己会默认根据hive表对应的hdfs文件的block,自动设置Spark SQL查询所在的那个stage的并行度。你自己通过spark.default.parallelism参数指定的并行度,只会在没有Spark SQL的stage中生效。

比如你第一个stage,用了Spark SQL从hive表中查询出了一些数据,然后做了一些transformation操作,接着做了一个shuffle操作(groupByKey);下一个stage,在shuffle操作之后,做了一些transformation操作。hive表,对应了一个hdfs文件,有20个block;你自己设置了spark.default.parallelism参数为100。

你的第一个stage的并行度,是不受你的控制的,就只有20个task;第二个stage,才会变成你自己设置的那个并行度,100。

问题在哪里?

Spark SQL默认情况下,它的那个并行度,咱们没法设置。可能导致的问题,也可能没什么问题。Spark SQL所在的那个stage中,后面的那些transformation操作,可能会有非常复杂的业务逻辑,甚至说复杂的算法。如果你的Spark SQL默认把task数量设置的很少,20个,然后每个task要处理为数不少的数据量,然后还要执行特别复杂的算法。

这个时候,就会导致第一个stage的速度,特别慢。第二个stage,刷刷刷,非常快。

如何解决
repartition算子,你用Spark SQL这一步的并行度和task数量,肯定是没有办法去改变了。但是呢,可以将你用Spark SQL查询出来的RDD,使用repartition算子,去重新进行分区,此时可以分区成多个partition,比如从20个partition,分区成100个。

然后呢,从repartition以后的RDD,再往后,并行度和task数量,就会按照你预期的来了。就可以避免跟Spark SQL绑定在一个stage中的算子,只能使用少量的task去处理大量数据以及复杂的算法逻辑。

return dataDF.javaRDD().repartition(1000);

使用reduceByKey本地聚合

用reduceByKey对性能的提升:

  1. 在本地进行聚合以后,在map端的数据量就变少了,减少磁盘IO。而且可以减少磁盘空间的占用。
  2. 下一个stage,拉取数据的量,也就变少了。减少网络的数据传输的性能消耗。
  3. 在reduce端进行数据缓存的内存占用变少了。
  4. reduce端,要进行聚合的数据量也变少了。