Spark Shuffle 专业级核心参数调优源码深入剖析-Spark商业环境实战

2,656 阅读12分钟

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客。版权声明:本套Spark商业应用实战归作者(秦凯新)所有,禁止转载,欢迎学习。

1 Spark运行资源优化配置

    ./bin/spark-submit \  
    --master yarn-cluster \  
    --num-executors 100 \  
    --executor-memory 6G \ 
    --executor-cores 4 \
    --driver-memory 1G \
    --conf spark.default.parallelism=1000 \
    --conf spark.storage.memoryFraction=0.5 \  
    --conf spark.shuffle.memoryFraction=0.3 \

2 Spark运行资源优化配置


  • spark.reducer.maxSizeInFlight

  • 默认值:48m

  • 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。

  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

       * Fetches and reads the partitions in range [startPartition, endPartition) from a shuffle by
       * requesting them from other nodes' block stores.
    
      private[spark] class BlockStoreShuffleReader[K, C](
          handle: BaseShuffleHandle[K, _, C],
          startPartition: Int,
          endPartition: Int,
          context: TaskContext,
          serializerManager: SerializerManager = SparkEnv.get.serializerManager,
          blockManager: BlockManager = SparkEnv.get.blockManager,
          mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker)
        extends ShuffleReader[K, C] with Logging {
      
        private val dep = handle.dependency
      
        /** Read the combined key-values for this reduce task */
        override def read(): Iterator[Product2[K, C]] = {
        
          val wrappedStreams = new ShuffleBlockFetcherIterator(
            context,
            blockManager.shuffleClient,
            blockManager,
            mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
            serializerManager.wrapStream,
            // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
            
            SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,       <=神来之笔
            
            SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
            SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
            SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
            SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
    

  • spark.shuffle.io.maxRetries

  • 默认值:3

  • 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。

  • 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

    public TransportConf(String module, ConfigProvider conf) {
      this.module = module;
      this.conf = conf;
      SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");
      SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");
      SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");
      SPARK_NETWORK_IO_BACKLOG_KEY = getConfKey("io.backLog");
      SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY =  getConfKey("io.numConnectionsPerPeer");
      SPARK_NETWORK_IO_SERVERTHREADS_KEY = getConfKey("io.serverThreads");
      SPARK_NETWORK_IO_CLIENTTHREADS_KEY = getConfKey("io.clientThreads");
      SPARK_NETWORK_IO_RECEIVEBUFFER_KEY = getConfKey("io.receiveBuffer");
      SPARK_NETWORK_IO_SENDBUFFER_KEY = getConfKey("io.sendBuffer");
      SPARK_NETWORK_SASL_TIMEOUT_KEY = getConfKey("sasl.timeout");
      SPARK_NETWORK_IO_MAXRETRIES_KEY = getConfKey("io.maxRetries");
      SPARK_NETWORK_IO_RETRYWAIT_KEY = getConfKey("io.retryWait");
      SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");
      SPARK_NETWORK_VERBOSE_METRICS = getConfKey("io.enableVerboseMetrics");
    }
    

  • spark.shuffle.io.retryWait
  • 默认值:5s
  • 参数说明: shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
  • 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

  • spark.shuffle.memoryFraction
  • 默认值:0.2
  • 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
  • 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

在这里好好唱一出戏:

(1) StaticMemoryManager 静态内存分配

  private def getMaxStorageMemory(conf: SparkConf): Long = {
  
    val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    
    val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)          <=神来之笔
    val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)          <=神来之笔
    (systemMaxMemory * memoryFraction * safetyFraction).toLong                        <=神来之笔
  }


    private def getMaxExecutionMemory(conf: SparkConf): Long = {
    
    val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)

    if (systemMaxMemory < MIN_MEMORY_BYTES) {
      throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
        s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
        s"option or spark.driver.memory in Spark configuration.")
    }
    if (conf.contains("spark.executor.memory")) {
      val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
      if (executorMemory < MIN_MEMORY_BYTES) {
        throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
          s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
          s"--executor-memory option or spark.executor.memory in Spark configuration.")
      }
    }
    val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)              <=神来之笔
    val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)              <=神来之笔
    (systemMaxMemory * memoryFraction * safetyFraction).toLong                            <=神来之笔
  }



  private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024                            <=神来之笔

  def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
    val maxMemory = getMaxMemory(conf)
    new UnifiedMemoryManager(
      conf,
      maxHeapMemory = maxMemory,
      onHeapStorageRegionSize =
        (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,        <=神来之笔
      numCores = numCores)
    }

(2) UnifiedMemoryManager 统一内存分配

    /**
       * Return the total amount of memory shared between execution and storage, in bytes.
       */
      private def getMaxMemory(conf: SparkConf): Long = {
      
        val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)    <=神来之笔
        val reservedMemory = conf.getLong("spark.testing.reservedMemory",                        <=神来之笔
        
          if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
        val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
        
        if (systemMemory < minSystemMemory) {
          throw new IllegalArgumentException(s"System memory $systemMemory must " +
            s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
            s"option or spark.driver.memory in Spark configuration.")
        }
        // SPARK-12759 Check executor memory to fail fast if memory is insufficient
        if (conf.contains("spark.executor.memory")) {
          val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
          if (executorMemory < minSystemMemory) {
            throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
              s"$minSystemMemory. Please increase executor memory using the " +
              s"--executor-memory option or spark.executor.memory in Spark configuration.")
          }
        }
        val usableMemory = systemMemory - reservedMemory
        
        val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)                       <=神来之笔
        (usableMemory * memoryFraction).toLong
      }


      def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
        val maxMemory = getMaxMemory(conf)
        new UnifiedMemoryManager(
          conf,
          maxHeapMemory = maxMemory,
          onHeapStorageRegionSize =
            (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,          <=神来之笔    
          numCores = numCores)
      }

  • spark.shuffle.manager

  • 默认值:sort

  • 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。

  • 调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

  • 配置参数:spark.shuffle.manager,默认是sort。

     val shortShuffleMgrNames = Map(
        "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
        "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
      val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    

  • spark.shuffle.sort.bypassMergeThreshold
  • 默认值:200
  • 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
  • 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

  • spark.shuffle.consolidateFiles
  • 默认值:false
  • 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
  • 调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

  • Spark.Shuffle.blockTransferService
  • 默认值:Netty
  • 实现在Executor之间传递Shuffle缓存块,有Netty和Nio两种可用的实现。

  • Spark.Shuffle.compress

  • 默认是true

  • 判断是否对mapper端的聚合输出进行压缩,表示每一个shuffle过程都会对mapper端的输出进行压缩。举例如下:如果有几千台或者上万台的机器进行汇聚计算,数据量和网络传输会非常大,这样会造成大连好的内存消耗,磁盘I/O消耗,以及网络I/O消耗。如果在Mapper端进行压缩,就会减少shuffle过程中下一个Stage向上一个Stage抓数据的网络开销。

        * Merge zero or more spill files together, choosing the fastest merging strategy based on the
        * number of spills and the IO compression codec.
        * @return the partition lengths in the merged file.
     
       private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
    
         final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);   <=神来之笔
         
         final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
         
         final boolean fastMergeEnabled =
           sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);                    <=神来之笔
           
         final boolean fastMergeIsSupported = !compressionEnabled ||
           CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);    <=神来之笔
           
         final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();
         try {
           if (spills.length == 0) {
             new FileOutputStream(outputFile).close(); // Create an empty file
             return new long[partitioner.numPartitions()];
           } else if (spills.length == 1) {
             // Here, we don't need to perform any metrics updates because the bytes written to this
             // output file would have already been counted as shuffle bytes written.
             Files.move(spills[0].file, outputFile);
             return spills[0].partitionLengths;
           } else {
             final long[] partitionLengths;
             // There are multiple spills to merge, so none of these spill files' lengths were counted
             // towards our shuffle write count or shuffle write time. If we use the slow merge path,
             // then the final output file's size won't necessarily be equal to the sum of the spill
             // files' sizes. To guard against this case, we look at the output file's actual size when
             // computing shuffle bytes written.
             //
             // We allow the individual merge methods to report their own IO times since different merge
             // strategies use different IO techniques.  We count IO during merge towards the shuffle
             // shuffle write time, which appears to be consistent with the "not bypassing merge-sort"
             // branch in ExternalSorter.
             
             if (fastMergeEnabled && fastMergeIsSupported) {
             
               // Compression is disabled or we are using an IO compression codec that supports
               // decompression of concatenated compressed streams, so we can perform a fast spill merge
               // that doesn't need to interpret the spilled bytes.
               if (transferToEnabled && !encryptionEnabled) {
                 logger.debug("Using transferTo-based fast merge");
                 partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
               } else {
                 logger.debug("Using fileStream-based fast merge");
                 partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null);
               }
             } else {
               logger.debug("Using slow merge");
               partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec);
             }
             // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has
             // in-memory records, we write out the in-memory records to a file but do not count that
             // final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
             // to be counted as shuffle write, but this will lead to double-counting of the final
             // SpillInfo's bytes.
             writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
             writeMetrics.incBytesWritten(outputFile.length());
             return partitionLengths;
           }
         } catch (IOException e) {
           if (outputFile.exists() && !outputFile.delete()) {
             logger.error("Unable to delete output file {}", outputFile.getPath());
           }
           throw e;
         }
       }
    

  • spark.io.compression.codec

  • 该参数用来压缩内部数据,如:RDD分区,广播变量和shuffle输出的数据等,所采用的压缩有LZ4,Lzf,Snappy等三种选择,默认是Snappy,但是和Snappy相比较,Lzf的压缩率较高。建议在大量Shuffle过程中,可以选择Lzf4。

  • 默认是Snappy

      private[spark] object CompressionCodec {
        private val configKey = "spark.io.compression.codec"
        private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
          (codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
            || codec.isInstanceOf[LZ4CompressionCodec] || codec.isInstanceOf[ZStdCompressionCodec])
        }
      
        private val shortCompressionCodecNames = Map(
          "lz4" -> classOf[LZ4CompressionCodec].getName,
          "lzf" -> classOf[LZFCompressionCodec].getName,
          "snappy" -> classOf[SnappyCompressionCodec].getName,
          "zstd" -> classOf[ZStdCompressionCodec].getName)
    

  • spark.shuffle.file.buffer

  • 默认值:32k(考虑最小硬件下都能成功)

  • 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。

  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

       private[spark] val SHUFFLE_FILE_BUFFER_SIZE =
          ConfigBuilder("spark.shuffle.file.buffer")
            .doc("Size of the in-memory buffer for each shuffle file output stream, in KiB unless " +
              "otherwise specified. These buffers reduce the number of disk seeks and system calls " +
              "made in creating intermediate shuffle files.")
            .bytesConf(ByteUnit.KiB)
            .checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
              s"The file buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
            .createWithDefaultString("32k")
    
    
    
      final class ShuffleExternalSorter extends MemoryConsumer {
      
        private static final Logger logger = LoggerFactory.getLogger(ShuffleExternalSorter.class);
      
        @VisibleForTesting
        static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
      
        private final int numPartitions;
        private final TaskMemoryManager taskMemoryManager;
        private final BlockManager blockManager;
        private final TaskContext taskContext;
        private final ShuffleWriteMetrics writeMetrics;
      
        /**
         * Force this sorter to spill when there are this many elements in memory.
         */
        private final int numElementsForSpillThreshold;
      
        /** The buffer size to use when writing spills using DiskBlockObjectWriter */
        private final int fileBufferSizeBytes;
      
        /** The buffer size to use when writing the sorted records to an on-disk file */
        private final int diskWriteBufferSize;
      
        /**
         * Memory pages that hold the records being sorted. The pages in this list are freed when
         * spilling, although in principle we could recycle these pages across spills (on the other hand,
         * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
         * itself).
         */
        private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<>();
      
        private final LinkedList<SpillInfo> spills = new LinkedList<>();
      
        /** Peak memory used by this sorter so far, in bytes. **/
        private long peakMemoryUsedBytes;
      
        // These variables are reset after spilling:
        @Nullable private ShuffleInMemorySorter inMemSorter;
        @Nullable private MemoryBlock currentPage = null;
        private long pageCursor = -1;
      
        ShuffleExternalSorter(
            TaskMemoryManager memoryManager,
            BlockManager blockManager,
            TaskContext taskContext,
            int initialSize,
            int numPartitions,
            SparkConf conf,
            ShuffleWriteMetrics writeMetrics) {
          super(memoryManager,
            (int) Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, memoryManager.pageSizeBytes()),
            memoryManager.getTungstenMemoryMode());
          this.taskMemoryManager = memoryManager;
          this.blockManager = blockManager;
          this.taskContext = taskContext;
          this.numPartitions = numPartitions;
          
          // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
          this.fileBufferSizeBytes =
              (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;            <=神来之笔
              
          this.numElementsForSpillThreshold =
              (int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());
              
          this.writeMetrics = writeMetrics;
          
          this.inMemSorter = new ShuffleInMemorySorter(
            this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));
            
          this.peakMemoryUsedBytes = getMemoryUsage();
          this.diskWriteBufferSize =
              (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
        }
    

  • spark.shuffle.io.numConnectionsPerPeer

  • 仅Netty使用,复用主机之间的连接,以减少大型集群的连接建立,

  • 默认是1

        TransportConf :
        Number of concurrent connections between two nodes for fetching data.
    
        public int numConnectionsPerPeer() {
          return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1);
        }
    

  • Spark.Shuffle.io.preferDirectBufs

  • 仅限Netty使用,堆外缓存可以有效减少垃圾回收和缓存复制。对于堆外内存紧张的用户来说,可以考虑禁用这个选项,从而迫使Netty所有的内存都分配到堆上,默认是true。

    TransportConf:

      /** If true, we will prefer allocating off-heap byte buffers within Netty. */
        public boolean preferDirectBufs() {
          return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true);
        }
    

  • spark.shuffle.service.enabled

  • 默认为false,如果配置成true,BlocakManager实例生成时,需要读取Spark.Shuffle.service.port配置的端口,注意此时BlockManager的ShuffleClient不再是默认的BlocakTransferSerice实例,而是ExternalShuffleClient。

  • 启用外部的Shuffle Service , NodeManager中会长期运行一个辅助任务,用于提升Shuffle计算性能。

      private[spark] class BlockManager(
          executorId: String,
          rpcEnv: RpcEnv,
          val master: BlockManagerMaster,
          val serializerManager: SerializerManager,
          val conf: SparkConf,
          memoryManager: MemoryManager,
          mapOutputTracker: MapOutputTracker,
          shuffleManager: ShuffleManager,
          val blockTransferService: BlockTransferService,
          securityManager: SecurityManager,
          numUsableCores: Int)
        extends BlockDataManager with BlockEvictionHandler with Logging {
      
        private[spark] val externalShuffleServiceEnabled =
          conf.getBoolean("spark.shuffle.service.enabled", false)
    
      // Port used by the external shuffle service. In Yarn mode, this may be already be
        // set through the Hadoop configuration as the server is launched in the Yarn NM.
        private val externalShuffleServicePort = {
          val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
          if (tmpPort == 0) {
            // for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds
            // an open port.  But we still need to tell our spark apps the right port to use.  So
            // only if the yarn config has the port set to 0, we prefer the value in the spark config
            conf.get("spark.shuffle.service.port").toInt
          } else {
            tmpPort
          }
        }
    
        // Client to read other executors' shuffle files. This is either an external service, or just the
        // standard BlockTransferService to directly connect to other Executors.
        private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
          val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
          new ExternalShuffleClient(transConf, securityManager,
            securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
        } else {
          blockTransferService
        }
    

基于Yarn的动态资源分配配置如下:

首先需要对YARN的NodeManager进行配置,使其支持Spark的Shuffle Service。
(1)修改每台NodeManager上的yarn-site.xml:
    ##修改
    <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle,spark_shuffle</value>
    </property>
    ##增加
    <property>
    <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
    <value>org.apache.spark.network.yarn.YarnShuffleService</value>
    </property>
    <property>
    <name>spark.shuffle.service.port</name>
    <value>7337</value>
    </property>

(2)将$SPARK_HOME/lib/spark-1.5.0-yarn-shuffle.jar拷贝到每台NodeManager的${HADOOP_HOME}/share/hadoop/yarn/lib/下。
(3)重启所有NodeManager。

  • Spark.shuffle.Sort.bypassMergeThreshold

  • 默认值为200

  • 场景如下:如果Shuffle Read Task 的数量小于这个阈值(默认是200),那么Shuffle Write的过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager方式去写数据,最终还是会将每一个Task所产生的所有临时磁盘文件合并成一个文件,并创建单独索引。

      private[spark] object SortShuffleWriter {
        def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
          // We cannot bypass sorting if we need to do map-side aggregation.
          if (dep.mapSideCombine) {
            require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
            false
          } else {
            val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
            dep.partitioner.numPartitions <= bypassMergeThreshold
          }
        }
    

  • Spark.Shuffle.spill

  • 默认是True

  • 即允许溢出到磁盘。

      private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
        if (!conf.getBoolean("spark.shuffle.spill", true)) {
          logWarning(
            "spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." +
              " Shuffle will continue to spill to disk when necessary.")
        }
    

  • spark.shuffle.spill.compress

  • 设置为True是合理的,因为网络带宽往往最容易成为瓶颈

  • 建议综合考虑cpu ,磁盘,网络的实际能力。

       * Component which configures serialization, compression and encryption for various Spark
       * components, including automatic selection of which [[Serializer]] to use for shuffles.
       */
      private[spark] class SerializerManager(
          defaultSerializer: Serializer,
          conf: SparkConf,
          encryptionKey: Option[Array[Byte]]) {
    
       // Whether to compress broadcast variables that are stored
        private[this] val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
        // Whether to compress shuffle output that are stored
        private[this] val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
        // Whether to compress RDD partitions that are stored serialized
        private[this] val compressRdds = conf.getBoolean("spark.rdd.compress", false)
        // Whether to compress shuffle output temporarily spilled to disk
        private[this] val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true)
    

3 总结

本文综合Spark的核心参数配置,花大量时间,阅读源码并找到参数调优的位置和条件,一份好文实属不易,禁止转载,欢迎学习

秦凯新 于深圳