Elasticsearch8.5.3源码分析(6)-数据落盘过程

5,637 阅读8分钟

文档写入原理

Lucene中写入文档的过程并不会立即将文档刷新到磁盘,而是先将文档写入内存缓冲区。

ElasticSearch后台刷写线程通过调用IndexShard#flush方法来触发刷写磁盘,只有达到一定的条件时,才会将内存缓冲区中的文档刷新到内核缓冲区的segment文件中。

ElasticSearch刷盘操作可以手动触发,同时默认情况下每1秒会触发一次刷新,也可以通过refresh_interval参数进行修改。另外,如果Translog文件大小超过512M也会触发刷盘操作,此阈值可以通过index.translog.flush_threshold_size来配置。

参考:www.elastic.co/guide/en/el…

刷盘过程中,会根据配置的合并策略来决定是否对多个小的segment文件进行合并,以减少segment文件的数量并提高查询效率。

同时刷盘过程中会暂停对Shard的写入操作,以保证数据的一致性。

image.png

刷盘生成新的Segment文件的同时,还会记录一个Commit Point文件,用来恢复索引数据。

Commit Point是指在Lucene索引中每个索引操作的事务性提交点。每个提交点都包括一个或多个Segment文件,它表示一个已经被完全索引的文档集合。在每个提交点之间,可以保证索引的一致性和可恢复性。当需要进行索引恢复操作时,可以将一个已经生成的提交点文件加载到内存中,并将之前所有的事务性操作重新执行一遍,从而恢复索引的状态。。

image.png

IndexWriter是Lucene框架的类。

IndexWriter#addDocuments,IndexWriter#updateDocuments和IndexWriter#softUpdateDocuments三个方法最终都是调用IndexWriter#updateDocuments方法。

IndexWriter通过调用DocumentsWriter实现将文档写入内存缓冲区,并加入待刷新队列,等待后台线程触发刷盘操作。

核心源码分析

InternalEngine#index

public IndexResult index(Index index) throws IOException {
    //校验文档的ID字段是否为IdFieldMapper.NAME
    assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
    //检查文档操作是否需要进行流控制(throttle)
    final boolean doThrottle = index.origin().isRecovery() == false;
    //尝试获取读锁
    try (ReleasableLock releasableLock = readLock.acquire()) {
        ensureOpen();
        //检查文档操作的 seqNo 是否正确
        assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
        int reservedDocs = 0;
        //尝试获取文档 ID 的写锁和流控制(throttle)
        try (
            Releasable ignored = versionMap.acquireLock(index.uid().bytes());
            Releasable indexThrottle = doThrottle ? throttle.acquireThrottle() : () -> {}
        ) {
            lastWriteNanos = index.startTime();
            //获得文档操作的索引策略
            final IndexingStrategy plan = indexingStrategyForOperation(index);
            reservedDocs = plan.reservedDocs;

            final IndexResult indexResult;
            //如果索引策略中存在预处理错误(earlyResultOnPreFlightError),则返回该错误结果
            if (plan.earlyResultOnPreFlightError.isPresent()) {
                assert index.origin() == Operation.Origin.PRIMARY : index.origin();
                indexResult = plan.earlyResultOnPreFlightError.get();
                assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
            } else {
                //如果文档操作来自主分片(PRIMARY),则重新生成文档的seqNo
                if (index.origin() == Operation.Origin.PRIMARY) {
                    index = new Index(
                        index.uid(),
                        index.parsedDoc(),
                        generateSeqNoForOperationOnPrimary(index),
                        index.primaryTerm(),
                        index.version(),
                        index.versionType(),
                        index.origin(),
                        index.startTime(),
                        index.getAutoGeneratedIdTimestamp(),
                        index.isRetry(),
                        index.getIfSeqNo(),
                        index.getIfPrimaryTerm()
                    );

                    final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
                    if (toAppend == false) {
                        //更新seqNo最大值。
                        advanceMaxSeqNoOfUpdatesOnPrimary(index.seqNo());
                    }
                } else {
                    //如果文档操作来自副本分片,将提供的序列号标记为可见,并根据需要更新max_seq_no
                    markSeqNoAsSeen(index.seqNo());
                }

                assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();
                //如果索引策略要求将文档写入 Lucene 中
                if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
                    //将文档写入Lucene中,并返回写入结果
                    indexResult = indexIntoLucene(index, plan);
                } else {
                    //如果索引策略不要求将文档写入Lucene中,则创建一个新的IndexResult对象返回
                    indexResult = new IndexResult(
                        plan.versionForIndexing,
                        index.primaryTerm(),
                        index.seqNo(),
                        plan.currentNotFoundOrDeleted,
                        index.id()
                    );
                }
            }
            //如果该文档不是来自于translog,则将其添加到translog中,并获得它的位置信息。
            //Translog是直接写入磁盘的,这个过程确保了如果索引操作在写入内核之前失败,这个操作仍然可以被恢复。
            if (index.origin().isFromTranslog() == false) {
                final Translog.Location location;
                if (indexResult.getResultType() == Result.Type.SUCCESS) {
                    location = translog.add(new Translog.Index(index, indexResult));
                } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
                    final NoOp noOp = new NoOp(
                        indexResult.getSeqNo(),
                        index.primaryTerm(),
                        index.origin(),
                        index.startTime(),
                        indexResult.getFailure().toString()
                    );
                    location = innerNoOp(noOp).getTranslogLocation();
                } else {
                    location = null;
                }
                indexResult.setTranslogLocation(location);
            }
            //如果Translog写入成功,则将将文档的版本信息存储到 versionMap 中
            if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
                final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
                versionMap.maybePutIndexUnderLock(
                    index.uid().bytes(),
                    new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm())
                );
            }
            //将当前索引操作的序列号标记为已处理
            localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo());
            //事务日志地址为空,表示已经持久化至磁盘。
            if (indexResult.getTranslogLocation() == null) {
                assert index.origin().isFromTranslog() || indexResult.getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO;
                //将该序列号标记为已持久化
                localCheckpointTracker.markSeqNoAsPersisted(indexResult.getSeqNo());
            }
            //记录执行索引操作所花费的时间
            indexResult.setTook(System.nanoTime() - index.startTime());
            //冻结 `indexResult` 对象,使其成为不可变的。这可以确保在多线程环境中对 `indexResult` 对象的访问是安全的
            indexResult.freeze();
            return indexResult;
        } finally {
            releaseInFlightDocs(reservedDocs);
        }
    } catch (RuntimeException | IOException e) {
       ...
    }
}

InternalEngine#indexIntoLucene

该方法的参数IndexingStrategy用于控制索引操作的行为.

addStaleOpToLucene属性表示当前的文档索引操作是否要添加到Lucene中,即使它已经被标记为过期或已被删除。

如果该变量为true,则当前的文档索引操作将被添加到Lucene中,即使它已被标记为过期或已被删除。

如果该变量为false,则会执行其他的文档索引操作,例如添加新文档或更新现有文档

useLuceneUpdateDocument属性表示是否使用 Lucene 的更新文档功能来更新文档。

更新文档的常见方式有两种:

  1. 删除旧文档,然后添加新文档;
  2. 使用Lucene的更新文档功能直接修改文档。

使用更新文档功能可以减少磁盘IO和内存占用,提高索引性能,但需要满足一定的条件,例如文档必须存储在单独的Lucene索引段中,并且不能包含已被删除的字段等。

private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws IOException {
        assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();
        assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing;
        //`plan.indexIntoLucene`表示是否将文档索引到 Lucene 中
        //只有当 `plan.indexIntoLucene` 或 `plan.addStaleOpToLucene` 为 `true` 时,才会将文档索引到 Lucene 中
        assert plan.indexIntoLucene || plan.addStaleOpToLucene;
        //更新文档的seqNo和primaryTerm;
        //此处的seqNo派生自序列号服务(如果该服务位于主文档上)或现有文档的序列号(如果位于副本上)。
        //此处的primaryTerm已经设置好,参考IndexShard#prepareIndex。
        index.parsedDoc().updateSeqID(index.seqNo(), index.primaryTerm());
        index.parsedDoc().version().setLongValue(plan.versionForIndexing);
        try {
            if (plan.addStaleOpToLucene) {
                //添加已经删除的文档
                addStaleDocs(index.docs(), indexWriter);
            } else if (plan.useLuceneUpdateDocument) {
                //判断当前操作的文档的操作序列号是否比已经存在于Lucene中的文档的最大操作序列号还要高
                assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), index.seqNo(), true, true);
                updateDocs(index.uid(), index.docs(), indexWriter);
            } else {
                //检查给定的文档是否已经存在于索引中
                assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
                addDocs(index.docs(), indexWriter);
            }
            return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted, index.id());
        } catch (Exception ex) {
            if (ex instanceof AlreadyClosedException == false
                && indexWriter.getTragicException() == null
                && treatDocumentFailureAsTragicError(index) == false) {
                return new IndexResult(ex, Versions.MATCH_ANY, index.primaryTerm(), index.seqNo(), index.id());
            } else {
                throw ex;
            }
        }
    }

Translog#add

向当前的Translog中添加一个操作(Operation),并返回该操作在Translog中的位置信息(Location)

public Location add(final Operation operation) throws IOException {
    final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
    try {
        final long start = out.position();
        //跳过4字节的位置以便之后写入操作大小(整型占4个字节)
        out.skip(Integer.BYTES);
        //将操作序列化到缓冲区中。该方法在写入操作后会计算校验和并将校验和写入`out`的当前位置
        writeOperationNoSize(new BufferedChecksumStreamOutput(out), operation);
        final long end = out.position();
        //计算操作所需字节数
        final int operationSize = (int) (end - Integer.BYTES - start);
        //回到开始位置
        out.seek(start);
        //将操作大小写入缓冲
        out.writeInt(operationSize);
        //回到结束位置 
        out.seek(end);
        final BytesReference bytes = out.bytes();
        //获取当前Translog实例的读锁,并确保Translog未关闭
        try (ReleasableLock ignored = readLock.acquire()) {
            ensureOpen();
            if (operation.primaryTerm() > current.getPrimaryTerm()) {
                //如果操作的primary term大于当前的primary term,抛出一个IllegalArgumentException异常
            }
            return current.add(bytes, operation.seqNo());
        }
    } catch (...) {
        //如果出现异常,则关闭Translog并重新抛出异常。
        ...
    } finally {
        //释放缓冲区资源
        Releasables.close(out);
    }
}

TranslogWriter#add

//data 表示写入到 Translog 的操作,seqNo 表示该操作的序列号,序列号用于实现 Elasticsearch 的并发控制机制
public Translog.Location add(final BytesReference data, final long seqNo) throws IOException {
    long bufferedBytesBeforeAdd = this.bufferedBytes;
    //检查已经缓存的字节数是否超过了阈值 forceWriteThreshold,如果超过了,就调用 writeBufferedOps 方法将已经缓存的操作写入到磁盘中
    if (bufferedBytesBeforeAdd >= forceWriteThreshold) {
        writeBufferedOps(Long.MAX_VALUE, bufferedBytesBeforeAdd >= forceWriteThreshold * 4);
    }

    final Translog.Location location;
    //获取一个同步锁,确保当前的TranslogWriter处于开启状态
    synchronized (this) {
        ensureOpen();
        if (buffer == null) {
            buffer = new ReleasableBytesStreamOutput(bigArrays);
        }
        assert bufferedBytes == buffer.size();
        //totalOffset表示此文件的总偏移量,包括写入文件以及缓冲区的字节数
        final long offset = totalOffset;
        totalOffset += data.length();
        //将操作写入缓冲区
        data.writeTo(buffer);
        //minSeqNo 和 maxSeqNo 分别表示 Translog 文件中所有操作的最小和最大序列号
        assert minSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;
        assert maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;
        
        minSeqNo = SequenceNumbers.min(minSeqNo, seqNo);
        maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo);
        //nonFsyncedSequenceNumbers记录了所有未被持久化到磁盘的操作序列号
        nonFsyncedSequenceNumbers.add(seqNo);
        //operationCounter记录了当前TranslogWriter缓冲区中的操作数量
        operationCounter++;
        //检查新写入的操作是否与已经缓存的操作序列号发生冲突。如果发生冲突,说明该操作已经被写入到 Translog 中,那么该操作就被丢弃
        assert assertNoSeqNumberConflict(seqNo, data);
        //新写入的操作在Translog文件中的位置信息
        location = new Translog.Location(generation, offset, data.length());
        bufferedBytes = buffer.size();
    }

    return location;
}

将缓冲区中的操作写入到磁盘中

private void writeBufferedOps(long offset, boolean blockOnExistingWriter) throws IOException {
    //加写锁,确保同一时间只有一个线程进行写操作
    try (ReleasableLock locked = blockOnExistingWriter ? writeLock.acquire() : writeLock.tryAcquire()) {
        try {
            //此处 offset > getWrittenOffset()必定为True,因为add方法传递的offset参数值为Long.MAX_VALUE
            if (locked != null && offset > getWrittenOffset()) {
                //pollOpsToWrite方法从缓冲区中取出尚未写入的操作
                writeAndReleaseOps(pollOpsToWrite());
            }
        } catch (Exception e) {
            closeWithTragicEvent(e);
            throw e;
        }
    }
}

将待写入的操作写入到磁盘中

private void writeAndReleaseOps(ReleasableBytesReference toWrite) throws IOException {
    try (ReleasableBytesReference toClose = toWrite) {
        assert writeLock.isHeldByCurrentThread();
        final int length = toWrite.length();
        if (length == 0) {
            return;
        }
        //获取直接IO缓冲区
        ByteBuffer ioBuffer = diskIoBufferPool.maybeGetDirectIOBuffer();
        if (ioBuffer == null) {
            //不使用直接缓冲区从当前线程写入,因此只需写入而不复制到IO缓冲区
            BytesRefIterator iterator = toWrite.iterator();
            BytesRef current;
            while ((current = iterator.next()) != null) {
                Channels.writeToChannel(current.bytes, current.offset, current.length, channel);
            }
            return;
        }
        BytesRefIterator iterator = toWrite.iterator();
        BytesRef current;
        //遍历待写入的操作数据,并将其写入 ByteBuffer 对象中,直到 ByteBuffer 对象写满
        while ((current = iterator.next()) != null) {
            int currentBytesConsumed = 0;
            while (currentBytesConsumed != current.length) {
                int nBytesToWrite = Math.min(current.length - currentBytesConsumed, ioBuffer.remaining());
                ioBuffer.put(current.bytes, current.offset + currentBytesConsumed, nBytesToWrite);
                currentBytesConsumed += nBytesToWrite;
                if (ioBuffer.hasRemaining() == false) {
                    ioBuffer.flip();
                    writeToFile(ioBuffer);
                    ioBuffer.clear();
                }
            }
        }
        ioBuffer.flip();
        //将 ByteBuffer 对象刷入磁盘
        writeToFile(ioBuffer);
    }
}