Hive 各InputFormat切分算法整理

5,570 阅读4分钟

hive中可在建表语句中指定fileformat

CREATE TABLE `test`( 
  `id` string COMMENT '',  
  `name` string COMMENT ''
  ) 
COMMENT '' 
ROW FORMAT SERDE  
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'  
STORED AS INPUTFORMAT  
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'  
OUTPUTFORMAT  
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' 
LOCATION 
  'viewfs://xxx' 

blog.csdn.net/Thomson617/… spark.apache.org/docs/latest…

  • FileInputFormat

说明 详情
全类名 org.apache.hadoop.mapreduce.lib.input.FileInputFormat
计算公式 Math.max(minSize, Math.min(maxSize, blockSize))
maxSize mapreduce.input.fileinputformat.split.maxsize,默认 Integer.MAX_VALUE
minSize mapreduce.input.fileinputformat.split.minsize,默认 1
blockSize hdfs上设置的一个块的大小,默认128M
算法含义 若maxSize小于blockSize(min<max<block),则按照maxSize切分文件(一个block切分成多个split);若minSize大于blockSize(block<min<max),则按照minSize切分文件(多个block组成一个split);否则(min<block<max),按照block切分文件
 protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
 }
 
 if (isSplitable(job, path)) {
      long blockSize = file.getBlockSize();
      // 计算每个split大小
      long splitSize = computeSplitSize(blockSize, minSize, maxSize);
      long bytesRemaining = length;
      // SPLIT_SLOP = 1.1 ,含义为剩余10%不切分
      while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
        // 当前要切分的split在哪个block中
        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
        splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                    blkLocations[blkIndex].getHosts(),
                    blkLocations[blkIndex].getCachedHosts()));
        bytesRemaining -= splitSize;
      }
      // 剩余未切分的文件
      if (bytesRemaining != 0) {
        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
        splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                   blkLocations[blkIndex].getHosts(),
                   blkLocations[blkIndex].getCachedHosts()));
      }
    } else { // not splitable
      splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                  blkLocations[0].getCachedHosts()));
}
  • OrcInputFormat

说明 详情
全类名 org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
strategy hive.exec.orc.split.strategy 默认 HYBRID
maxSize mapreduce.input.fileinputformat.split.maxsize,默认 Integer.MAX_VALUE
minSize mapreduce.input.fileinputformat.split.minsize,默认 1
计算公式 strategy = HYBRID? split by file (BI) : merge stripe if less then minSize(ETL)
算法说明 hive.exec.orc.split.strategy参数控制在读取ORC表时生成split的策略。BI策略以文件为粒度进行split划分;ETL策略会将文件进行切分,多个stripe组成一个split;HYBRID策略为:当文件的平均大小大于hadoop最大split值(默认256 * 1024 * 1024)时使用ETL策略,否则使用BI策略。 --- 对于一些较大的ORC表,可能其footer较大,ETL策略可能会导致其从hdfs拉取大量的数据来切分split,甚至会导致driver端OOM,因此这类表的读取建议使用BI策略。对于一些较小的尤其有数据倾斜的表(这里的数据倾斜指大量stripe存储于少数文件中),建议使用ETL策略。--- 另外,spark.hadoop.mapreduce.input.fileinputformat.split.minsize参数可以控制在ORC切分时stripe的合并处理。具体逻辑是,当几个stripe的大小小于spark.hadoop.mapreduce.input.fileinputformat.split.minsize时,会合并到一个task中处理。可以适当调小该值,以此增大读ORC表的并发。
switch(context.splitStrategyKind) {
          case BI:
            // BI strategy requested through config
            splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal,
                deltas, covered);
            break;
          case ETL:
            // ETL strategy requested through config
            splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal,
                deltas, covered);
            break;
          default:
            // HYBRID strategy
            if (avgFileSize > context.maxSize) {
              splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal, deltas,
                  covered);
            } else {
              splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal, deltas,
                  covered);
            }
            break;
        }
  
  /**
   * BI strategy is used when the requirement is to spend less time in split generation
   * as opposed to query execution (split generation does not read or cache file footers).
   */
  static final class BISplitStrategy extends ACIDSplitStrategy {
    List<FileStatus> fileStatuses;
    boolean isOriginal;
    List<Long> deltas;
    FileSystem fs;
    Context context;
    Path dir;

    public BISplitStrategy(Context context, FileSystem fs,
        Path dir, List<FileStatus> fileStatuses, boolean isOriginal,
        List<Long> deltas, boolean[] covered) {
      super(dir, context.numBuckets, deltas, covered);
      this.context = context;
      this.fileStatuses = fileStatuses;
      this.isOriginal = isOriginal;
      this.deltas = deltas;
      this.fs = fs;
      this.dir = dir;
    }

    @Override
    public List<OrcSplit> getSplits() throws IOException {
      List<OrcSplit> splits = Lists.newArrayList();
      for (FileStatus fileStatus : fileStatuses) {
        String[] hosts = SHIMS.getLocationsWithOffset(fs, fileStatus).firstEntry().getValue()
            .getHosts();
        OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), 0, fileStatus.getLen(), hosts,
            null, isOriginal, true, deltas, -1);
        splits.add(orcSplit);
      }

      // add uncovered ACID delta splits
      splits.addAll(super.getSplits());
      return splits;
    }

    @Override
    public String toString() {
      return BISplitStrategy.class.getSimpleName() + " strategy for " + dir;
    }
  }
/**
   * ETL strategy is used when spending little more time in split generation is acceptable
   * (split generation reads and caches file footers).
   */
  static final class ETLSplitStrategy implements SplitStrategy<SplitInfo> {
    Context context;
    FileSystem fs;
    List<FileStatus> files;
    boolean isOriginal;
    List<Long> deltas;
    Path dir;
    boolean[] covered;

    public ETLSplitStrategy(Context context, FileSystem fs, Path dir, List<FileStatus> children,
        boolean isOriginal, List<Long> deltas, boolean[] covered) {
      this.context = context;
      this.dir = dir;
      this.fs = fs;
      this.files = children;
      this.isOriginal = isOriginal;
      this.deltas = deltas;
      this.covered = covered;
    }

    private FileInfo verifyCachedFileInfo(FileStatus file) {
      context.numFilesCounter.incrementAndGet();
      FileInfo fileInfo = Context.footerCache.getIfPresent(file.getPath());
      if (fileInfo != null) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Info cached for path: " + file.getPath());
        }
        if (fileInfo.modificationTime == file.getModificationTime() &&
            fileInfo.size == file.getLen()) {
          // Cached copy is valid
          context.cacheHitCounter.incrementAndGet();
          return fileInfo;
        } else {
          // Invalidate
          Context.footerCache.invalidate(file.getPath());
          if (LOG.isDebugEnabled()) {
            LOG.debug("Meta-Info for : " + file.getPath() +
                " changed. CachedModificationTime: "
                + fileInfo.modificationTime + ", CurrentModificationTime: "
                + file.getModificationTime()
                + ", CachedLength: " + fileInfo.size + ", CurrentLength: " +
                file.getLen());
          }
        }
      } else {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Info not cached for path: " + file.getPath());
        }
      }
      return null;
    }

    @Override
    public List<SplitInfo> getSplits() throws IOException {
      List<SplitInfo> result = Lists.newArrayList();
      for (FileStatus file : files) {
        FileInfo info = null;
        if (context.cacheStripeDetails) {
          info = verifyCachedFileInfo(file);
        }
        // ignore files of 0 length
        if (file.getLen() > 0) {
          result.add(new SplitInfo(context, fs, file, info, isOriginal, deltas, true, dir, covered));
        }
      }
      return result;
    }

    @Override
    public String toString() {
      return ETLSplitStrategy.class.getSimpleName() + " strategy for " + dir;
    }
  }
long currentOffset = -1;
      long currentLength = 0;
      int idx = -1;
      for (StripeInformation stripe : stripes) {
        idx++;

        if (!includeStripe[idx]) {
          // create split for the previous unfinished stripe
          if (currentOffset != -1) {
            splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
            currentOffset = -1;
          }
          continue;
        }

        // if we are working on a stripe, over the min stripe size, and
        // crossed a block boundary, cut the input split here.
        if (currentOffset != -1 && currentLength > context.minSize &&
            (currentOffset / blockSize != stripe.getOffset() / blockSize)) {
          splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
          currentOffset = -1;
        }
        // if we aren't building a split, start a new one.
        if (currentOffset == -1) {
          currentOffset = stripe.getOffset();
          currentLength = stripe.getLength();
        } else {
          currentLength =
              (stripe.getOffset() + stripe.getLength()) - currentOffset;
        }
        if (currentLength >= context.maxSize) {
          splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
          currentOffset = -1;
        }
      }
      if (currentOffset != -1) {
        splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
      }

      // add uncovered ACID delta splits
      splits.addAll(deltaSplits);