hive中可在建表语句中指定fileformat
CREATE TABLE `test`(
`id` string COMMENT '',
`name` string COMMENT ''
)
COMMENT ''
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'viewfs://xxx'
blog.csdn.net/Thomson617/… spark.apache.org/docs/latest…
-
FileInputFormat
说明 | 详情 |
---|---|
全类名 | org.apache.hadoop.mapreduce.lib.input.FileInputFormat |
计算公式 | Math.max(minSize, Math.min(maxSize, blockSize)) |
maxSize | mapreduce.input.fileinputformat.split.maxsize,默认 Integer.MAX_VALUE |
minSize | mapreduce.input.fileinputformat.split.minsize,默认 1 |
blockSize | hdfs上设置的一个块的大小,默认128M |
算法含义 | 若maxSize小于blockSize(min<max<block),则按照maxSize切分文件(一个block切分成多个split);若minSize大于blockSize(block<min<max),则按照minSize切分文件(多个block组成一个split);否则(min<block<max),按照block切分文件 |
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
// 计算每个split大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
// SPLIT_SLOP = 1.1 ,含义为剩余10%不切分
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
// 当前要切分的split在哪个block中
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
// 剩余未切分的文件
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
-
OrcInputFormat
说明 | 详情 |
---|---|
全类名 | org.apache.hadoop.hive.ql.io.orc.OrcInputFormat |
strategy | hive.exec.orc.split.strategy 默认 HYBRID |
maxSize | mapreduce.input.fileinputformat.split.maxsize,默认 Integer.MAX_VALUE |
minSize | mapreduce.input.fileinputformat.split.minsize,默认 1 |
计算公式 | strategy = HYBRID? split by file (BI) : merge stripe if less then minSize(ETL) |
算法说明 | hive.exec.orc.split.strategy参数控制在读取ORC表时生成split的策略。BI策略以文件为粒度进行split划分;ETL策略会将文件进行切分,多个stripe组成一个split;HYBRID策略为:当文件的平均大小大于hadoop最大split值(默认256 * 1024 * 1024)时使用ETL策略,否则使用BI策略。 --- 对于一些较大的ORC表,可能其footer较大,ETL策略可能会导致其从hdfs拉取大量的数据来切分split,甚至会导致driver端OOM,因此这类表的读取建议使用BI策略。对于一些较小的尤其有数据倾斜的表(这里的数据倾斜指大量stripe存储于少数文件中),建议使用ETL策略。--- 另外,spark.hadoop.mapreduce.input.fileinputformat.split.minsize参数可以控制在ORC切分时stripe的合并处理。具体逻辑是,当几个stripe的大小小于spark.hadoop.mapreduce.input.fileinputformat.split.minsize时,会合并到一个task中处理。可以适当调小该值,以此增大读ORC表的并发。 |
switch(context.splitStrategyKind) {
case BI:
// BI strategy requested through config
splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal,
deltas, covered);
break;
case ETL:
// ETL strategy requested through config
splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal,
deltas, covered);
break;
default:
// HYBRID strategy
if (avgFileSize > context.maxSize) {
splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal, deltas,
covered);
} else {
splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal, deltas,
covered);
}
break;
}
/**
* BI strategy is used when the requirement is to spend less time in split generation
* as opposed to query execution (split generation does not read or cache file footers).
*/
static final class BISplitStrategy extends ACIDSplitStrategy {
List<FileStatus> fileStatuses;
boolean isOriginal;
List<Long> deltas;
FileSystem fs;
Context context;
Path dir;
public BISplitStrategy(Context context, FileSystem fs,
Path dir, List<FileStatus> fileStatuses, boolean isOriginal,
List<Long> deltas, boolean[] covered) {
super(dir, context.numBuckets, deltas, covered);
this.context = context;
this.fileStatuses = fileStatuses;
this.isOriginal = isOriginal;
this.deltas = deltas;
this.fs = fs;
this.dir = dir;
}
@Override
public List<OrcSplit> getSplits() throws IOException {
List<OrcSplit> splits = Lists.newArrayList();
for (FileStatus fileStatus : fileStatuses) {
String[] hosts = SHIMS.getLocationsWithOffset(fs, fileStatus).firstEntry().getValue()
.getHosts();
OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), 0, fileStatus.getLen(), hosts,
null, isOriginal, true, deltas, -1);
splits.add(orcSplit);
}
// add uncovered ACID delta splits
splits.addAll(super.getSplits());
return splits;
}
@Override
public String toString() {
return BISplitStrategy.class.getSimpleName() + " strategy for " + dir;
}
}
/**
* ETL strategy is used when spending little more time in split generation is acceptable
* (split generation reads and caches file footers).
*/
static final class ETLSplitStrategy implements SplitStrategy<SplitInfo> {
Context context;
FileSystem fs;
List<FileStatus> files;
boolean isOriginal;
List<Long> deltas;
Path dir;
boolean[] covered;
public ETLSplitStrategy(Context context, FileSystem fs, Path dir, List<FileStatus> children,
boolean isOriginal, List<Long> deltas, boolean[] covered) {
this.context = context;
this.dir = dir;
this.fs = fs;
this.files = children;
this.isOriginal = isOriginal;
this.deltas = deltas;
this.covered = covered;
}
private FileInfo verifyCachedFileInfo(FileStatus file) {
context.numFilesCounter.incrementAndGet();
FileInfo fileInfo = Context.footerCache.getIfPresent(file.getPath());
if (fileInfo != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Info cached for path: " + file.getPath());
}
if (fileInfo.modificationTime == file.getModificationTime() &&
fileInfo.size == file.getLen()) {
// Cached copy is valid
context.cacheHitCounter.incrementAndGet();
return fileInfo;
} else {
// Invalidate
Context.footerCache.invalidate(file.getPath());
if (LOG.isDebugEnabled()) {
LOG.debug("Meta-Info for : " + file.getPath() +
" changed. CachedModificationTime: "
+ fileInfo.modificationTime + ", CurrentModificationTime: "
+ file.getModificationTime()
+ ", CachedLength: " + fileInfo.size + ", CurrentLength: " +
file.getLen());
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Info not cached for path: " + file.getPath());
}
}
return null;
}
@Override
public List<SplitInfo> getSplits() throws IOException {
List<SplitInfo> result = Lists.newArrayList();
for (FileStatus file : files) {
FileInfo info = null;
if (context.cacheStripeDetails) {
info = verifyCachedFileInfo(file);
}
// ignore files of 0 length
if (file.getLen() > 0) {
result.add(new SplitInfo(context, fs, file, info, isOriginal, deltas, true, dir, covered));
}
}
return result;
}
@Override
public String toString() {
return ETLSplitStrategy.class.getSimpleName() + " strategy for " + dir;
}
}
long currentOffset = -1;
long currentLength = 0;
int idx = -1;
for (StripeInformation stripe : stripes) {
idx++;
if (!includeStripe[idx]) {
// create split for the previous unfinished stripe
if (currentOffset != -1) {
splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
currentOffset = -1;
}
continue;
}
// if we are working on a stripe, over the min stripe size, and
// crossed a block boundary, cut the input split here.
if (currentOffset != -1 && currentLength > context.minSize &&
(currentOffset / blockSize != stripe.getOffset() / blockSize)) {
splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
currentOffset = -1;
}
// if we aren't building a split, start a new one.
if (currentOffset == -1) {
currentOffset = stripe.getOffset();
currentLength = stripe.getLength();
} else {
currentLength =
(stripe.getOffset() + stripe.getLength()) - currentOffset;
}
if (currentLength >= context.maxSize) {
splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
currentOffset = -1;
}
}
if (currentOffset != -1) {
splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
}
// add uncovered ACID delta splits
splits.addAll(deltaSplits);