文档写入原理
Lucene中写入文档的过程并不会立即将文档刷新到磁盘,而是先将文档写入内存缓冲区。
ElasticSearch后台刷写线程通过调用IndexShard#flush方法来触发刷写磁盘,只有达到一定的条件时,才会将内存缓冲区中的文档刷新到内核缓冲区的segment文件中。
ElasticSearch刷盘操作可以手动触发,同时默认情况下每1秒会触发一次刷新,也可以通过refresh_interval
参数进行修改。另外,如果Translog文件大小超过512M也会触发刷盘操作,此阈值可以通过index.translog.flush_threshold_size
来配置。
参考:www.elastic.co/guide/en/el…
刷盘过程中,会根据配置的合并策略来决定是否对多个小的segment文件进行合并,以减少segment文件的数量并提高查询效率。
同时刷盘过程中会暂停对Shard的写入操作,以保证数据的一致性。
刷盘生成新的Segment文件的同时,还会记录一个Commit Point文件,用来恢复索引数据。
Commit Point是指在Lucene索引中每个索引操作的事务性提交点。每个提交点都包括一个或多个Segment文件,它表示一个已经被完全索引的文档集合。在每个提交点之间,可以保证索引的一致性和可恢复性。当需要进行索引恢复操作时,可以将一个已经生成的提交点文件加载到内存中,并将之前所有的事务性操作重新执行一遍,从而恢复索引的状态。。
IndexWriter是Lucene框架的类。
IndexWriter#addDocuments,IndexWriter#updateDocuments和IndexWriter#softUpdateDocuments三个方法最终都是调用IndexWriter#updateDocuments方法。
IndexWriter通过调用DocumentsWriter实现将文档写入内存缓冲区,并加入待刷新队列,等待后台线程触发刷盘操作。
核心源码分析
InternalEngine#index
public IndexResult index(Index index) throws IOException {
//校验文档的ID字段是否为IdFieldMapper.NAME
assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
//检查文档操作是否需要进行流控制(throttle)
final boolean doThrottle = index.origin().isRecovery() == false;
//尝试获取读锁
try (ReleasableLock releasableLock = readLock.acquire()) {
ensureOpen();
//检查文档操作的 seqNo 是否正确
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
int reservedDocs = 0;
//尝试获取文档 ID 的写锁和流控制(throttle)
try (
Releasable ignored = versionMap.acquireLock(index.uid().bytes());
Releasable indexThrottle = doThrottle ? throttle.acquireThrottle() : () -> {}
) {
lastWriteNanos = index.startTime();
//获得文档操作的索引策略
final IndexingStrategy plan = indexingStrategyForOperation(index);
reservedDocs = plan.reservedDocs;
final IndexResult indexResult;
//如果索引策略中存在预处理错误(earlyResultOnPreFlightError),则返回该错误结果
if (plan.earlyResultOnPreFlightError.isPresent()) {
assert index.origin() == Operation.Origin.PRIMARY : index.origin();
indexResult = plan.earlyResultOnPreFlightError.get();
assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
} else {
//如果文档操作来自主分片(PRIMARY),则重新生成文档的seqNo
if (index.origin() == Operation.Origin.PRIMARY) {
index = new Index(
index.uid(),
index.parsedDoc(),
generateSeqNoForOperationOnPrimary(index),
index.primaryTerm(),
index.version(),
index.versionType(),
index.origin(),
index.startTime(),
index.getAutoGeneratedIdTimestamp(),
index.isRetry(),
index.getIfSeqNo(),
index.getIfPrimaryTerm()
);
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
if (toAppend == false) {
//更新seqNo最大值。
advanceMaxSeqNoOfUpdatesOnPrimary(index.seqNo());
}
} else {
//如果文档操作来自副本分片,将提供的序列号标记为可见,并根据需要更新max_seq_no
markSeqNoAsSeen(index.seqNo());
}
assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();
//如果索引策略要求将文档写入 Lucene 中
if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
//将文档写入Lucene中,并返回写入结果
indexResult = indexIntoLucene(index, plan);
} else {
//如果索引策略不要求将文档写入Lucene中,则创建一个新的IndexResult对象返回
indexResult = new IndexResult(
plan.versionForIndexing,
index.primaryTerm(),
index.seqNo(),
plan.currentNotFoundOrDeleted,
index.id()
);
}
}
//如果该文档不是来自于translog,则将其添加到translog中,并获得它的位置信息。
//Translog是直接写入磁盘的,这个过程确保了如果索引操作在写入内核之前失败,这个操作仍然可以被恢复。
if (index.origin().isFromTranslog() == false) {
final Translog.Location location;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
final NoOp noOp = new NoOp(
indexResult.getSeqNo(),
index.primaryTerm(),
index.origin(),
index.startTime(),
indexResult.getFailure().toString()
);
location = innerNoOp(noOp).getTranslogLocation();
} else {
location = null;
}
indexResult.setTranslogLocation(location);
}
//如果Translog写入成功,则将将文档的版本信息存储到 versionMap 中
if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
versionMap.maybePutIndexUnderLock(
index.uid().bytes(),
new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm())
);
}
//将当前索引操作的序列号标记为已处理
localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo());
//事务日志地址为空,表示已经持久化至磁盘。
if (indexResult.getTranslogLocation() == null) {
assert index.origin().isFromTranslog() || indexResult.getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO;
//将该序列号标记为已持久化
localCheckpointTracker.markSeqNoAsPersisted(indexResult.getSeqNo());
}
//记录执行索引操作所花费的时间
indexResult.setTook(System.nanoTime() - index.startTime());
//冻结 `indexResult` 对象,使其成为不可变的。这可以确保在多线程环境中对 `indexResult` 对象的访问是安全的
indexResult.freeze();
return indexResult;
} finally {
releaseInFlightDocs(reservedDocs);
}
} catch (RuntimeException | IOException e) {
...
}
}
InternalEngine#indexIntoLucene
该方法的参数IndexingStrategy
用于控制索引操作的行为.
addStaleOpToLucene属性表示当前的文档索引操作是否要添加到Lucene中,即使它已经被标记为过期或已被删除。
如果该变量为
true
,则当前的文档索引操作将被添加到Lucene中,即使它已被标记为过期或已被删除。如果该变量为
false
,则会执行其他的文档索引操作,例如添加新文档或更新现有文档
useLuceneUpdateDocument属性表示是否使用 Lucene 的更新文档功能来更新文档。
更新文档的常见方式有两种:
- 删除旧文档,然后添加新文档;
- 使用Lucene的更新文档功能直接修改文档。
使用更新文档功能可以减少磁盘IO和内存占用,提高索引性能,但需要满足一定的条件,例如文档必须存储在单独的Lucene索引段中,并且不能包含已被删除的字段等。
private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws IOException {
assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();
assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing;
//`plan.indexIntoLucene`表示是否将文档索引到 Lucene 中
//只有当 `plan.indexIntoLucene` 或 `plan.addStaleOpToLucene` 为 `true` 时,才会将文档索引到 Lucene 中
assert plan.indexIntoLucene || plan.addStaleOpToLucene;
//更新文档的seqNo和primaryTerm;
//此处的seqNo派生自序列号服务(如果该服务位于主文档上)或现有文档的序列号(如果位于副本上)。
//此处的primaryTerm已经设置好,参考IndexShard#prepareIndex。
index.parsedDoc().updateSeqID(index.seqNo(), index.primaryTerm());
index.parsedDoc().version().setLongValue(plan.versionForIndexing);
try {
if (plan.addStaleOpToLucene) {
//添加已经删除的文档
addStaleDocs(index.docs(), indexWriter);
} else if (plan.useLuceneUpdateDocument) {
//判断当前操作的文档的操作序列号是否比已经存在于Lucene中的文档的最大操作序列号还要高
assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), index.seqNo(), true, true);
updateDocs(index.uid(), index.docs(), indexWriter);
} else {
//检查给定的文档是否已经存在于索引中
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
addDocs(index.docs(), indexWriter);
}
return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted, index.id());
} catch (Exception ex) {
if (ex instanceof AlreadyClosedException == false
&& indexWriter.getTragicException() == null
&& treatDocumentFailureAsTragicError(index) == false) {
return new IndexResult(ex, Versions.MATCH_ANY, index.primaryTerm(), index.seqNo(), index.id());
} else {
throw ex;
}
}
}
Translog#add
向当前的Translog中添加一个操作(Operation),并返回该操作在Translog中的位置信息(Location)
public Location add(final Operation operation) throws IOException {
final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
try {
final long start = out.position();
//跳过4字节的位置以便之后写入操作大小(整型占4个字节)
out.skip(Integer.BYTES);
//将操作序列化到缓冲区中。该方法在写入操作后会计算校验和并将校验和写入`out`的当前位置
writeOperationNoSize(new BufferedChecksumStreamOutput(out), operation);
final long end = out.position();
//计算操作所需字节数
final int operationSize = (int) (end - Integer.BYTES - start);
//回到开始位置
out.seek(start);
//将操作大小写入缓冲
out.writeInt(operationSize);
//回到结束位置
out.seek(end);
final BytesReference bytes = out.bytes();
//获取当前Translog实例的读锁,并确保Translog未关闭
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
if (operation.primaryTerm() > current.getPrimaryTerm()) {
//如果操作的primary term大于当前的primary term,抛出一个IllegalArgumentException异常
}
return current.add(bytes, operation.seqNo());
}
} catch (...) {
//如果出现异常,则关闭Translog并重新抛出异常。
...
} finally {
//释放缓冲区资源
Releasables.close(out);
}
}
TranslogWriter#add
//data 表示写入到 Translog 的操作,seqNo 表示该操作的序列号,序列号用于实现 Elasticsearch 的并发控制机制
public Translog.Location add(final BytesReference data, final long seqNo) throws IOException {
long bufferedBytesBeforeAdd = this.bufferedBytes;
//检查已经缓存的字节数是否超过了阈值 forceWriteThreshold,如果超过了,就调用 writeBufferedOps 方法将已经缓存的操作写入到磁盘中
if (bufferedBytesBeforeAdd >= forceWriteThreshold) {
writeBufferedOps(Long.MAX_VALUE, bufferedBytesBeforeAdd >= forceWriteThreshold * 4);
}
final Translog.Location location;
//获取一个同步锁,确保当前的TranslogWriter处于开启状态
synchronized (this) {
ensureOpen();
if (buffer == null) {
buffer = new ReleasableBytesStreamOutput(bigArrays);
}
assert bufferedBytes == buffer.size();
//totalOffset表示此文件的总偏移量,包括写入文件以及缓冲区的字节数
final long offset = totalOffset;
totalOffset += data.length();
//将操作写入缓冲区
data.writeTo(buffer);
//minSeqNo 和 maxSeqNo 分别表示 Translog 文件中所有操作的最小和最大序列号
assert minSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;
assert maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;
minSeqNo = SequenceNumbers.min(minSeqNo, seqNo);
maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo);
//nonFsyncedSequenceNumbers记录了所有未被持久化到磁盘的操作序列号
nonFsyncedSequenceNumbers.add(seqNo);
//operationCounter记录了当前TranslogWriter缓冲区中的操作数量
operationCounter++;
//检查新写入的操作是否与已经缓存的操作序列号发生冲突。如果发生冲突,说明该操作已经被写入到 Translog 中,那么该操作就被丢弃
assert assertNoSeqNumberConflict(seqNo, data);
//新写入的操作在Translog文件中的位置信息
location = new Translog.Location(generation, offset, data.length());
bufferedBytes = buffer.size();
}
return location;
}
将缓冲区中的操作写入到磁盘中
private void writeBufferedOps(long offset, boolean blockOnExistingWriter) throws IOException {
//加写锁,确保同一时间只有一个线程进行写操作
try (ReleasableLock locked = blockOnExistingWriter ? writeLock.acquire() : writeLock.tryAcquire()) {
try {
//此处 offset > getWrittenOffset()必定为True,因为add方法传递的offset参数值为Long.MAX_VALUE
if (locked != null && offset > getWrittenOffset()) {
//pollOpsToWrite方法从缓冲区中取出尚未写入的操作
writeAndReleaseOps(pollOpsToWrite());
}
} catch (Exception e) {
closeWithTragicEvent(e);
throw e;
}
}
}
将待写入的操作写入到磁盘中
private void writeAndReleaseOps(ReleasableBytesReference toWrite) throws IOException {
try (ReleasableBytesReference toClose = toWrite) {
assert writeLock.isHeldByCurrentThread();
final int length = toWrite.length();
if (length == 0) {
return;
}
//获取直接IO缓冲区
ByteBuffer ioBuffer = diskIoBufferPool.maybeGetDirectIOBuffer();
if (ioBuffer == null) {
//不使用直接缓冲区从当前线程写入,因此只需写入而不复制到IO缓冲区
BytesRefIterator iterator = toWrite.iterator();
BytesRef current;
while ((current = iterator.next()) != null) {
Channels.writeToChannel(current.bytes, current.offset, current.length, channel);
}
return;
}
BytesRefIterator iterator = toWrite.iterator();
BytesRef current;
//遍历待写入的操作数据,并将其写入 ByteBuffer 对象中,直到 ByteBuffer 对象写满
while ((current = iterator.next()) != null) {
int currentBytesConsumed = 0;
while (currentBytesConsumed != current.length) {
int nBytesToWrite = Math.min(current.length - currentBytesConsumed, ioBuffer.remaining());
ioBuffer.put(current.bytes, current.offset + currentBytesConsumed, nBytesToWrite);
currentBytesConsumed += nBytesToWrite;
if (ioBuffer.hasRemaining() == false) {
ioBuffer.flip();
writeToFile(ioBuffer);
ioBuffer.clear();
}
}
}
ioBuffer.flip();
//将 ByteBuffer 对象刷入磁盘
writeToFile(ioBuffer);
}
}