背景
Lucene中删除文档主要有两种途径:
- 直接删除:直接根据term或者query条件进行删除
- IndexWriter#deleteDocuments(Query...)
- IndexWriter#deleteDocuments(Term...)
- 间接删除:lucene中更新文档,是先添加新文档,再删除满足term条件的文档
- IndexWriter#updateDocument(Term, doc)
- IndexWriter#updateDocuments(Term, docs)
文档删除的过程其实比较复杂,需要解决的问题有:
- 怎么确定删除条件的作用范围?范围又分为两种,一个是作用在哪些segment,另一个是作用在segment中的doc范围。
- 已经flush的segment和还未flush的segment删除逻辑分别怎么处理?
- 删除结果的持久化时机?持久化的时间点是不是和可见的时间点相同?
Lucene中还有一种操作,和文档删除的处理逻辑很相似:DocValues的更新。这个我们本文一并介绍。
确定删除的范围
删除范围纯属设计层面的东西,不需要详细了解源码也可以大概来说清楚。因此,我们先看下删除的范围是怎么确定的?
先简单介绍几个相关的组件,详细的介绍见后文:
- DeleteSlice:一个链表,链表中的节点都是删除条件,不同的删除条件是不同的Node,比如根据term删除的TermNode,根据query删除的QueryNode等。注意:所有DeleteSlice头结点指向的节点是个哨兵节点。
- IndexWriter:索引管理的入口,提供了新增文档,删除文档,更新文档等接口,新增文档或者更新文档的时候会选择一个DocumentsWriterPerThread来执行,没有的话会创建。IndexWriter持有全局的DeleteSlice,这个队列中的删除条件是作用于所有已经存在的segment中的所有doc。
- DocumentsWriterPerThread:对应了一个未flush的segment,DocumentsWriterPerThread维护了一个私有的DeleteSlice队列,其实是全局DeleteSlice中的一段,这个私有DeleteSlice中所有的删除条件作用范围就是DocumentsWriterPerThread对应的未flush的segment,会在flush的时候处理。而删除条件作用的doc范围就是删除条件加入队列时候DocumentsWriterPerThread中已有的doc。
下面我举个例子,需要注意的是为了方便描述队列的变化过程,我简化了队列的维护逻辑。
初始状态
如上图所示,初始状态下,也就是初始化IndexWriter,假设我们现在是重新打开一个索引,索引中已经有两个segment。当前的全局DeleteSlice是个空链表,但是不管全局DeleteSlice怎么变,全局DeleteSlice中的所有删除条件作用范围都包括segment0和segment1。
通过IndexWriter删除文档
如上图所示,当通过IndexWriter的删除文档接口删除文档时,会把删除的条件加入到全局DeleteSlice。
通过IndexWriter新增文档
当通过IndexWriter的新增文档的时候,它会创建一个DWPT1来处理,DWPT1的私有DeleteSlice头尾都指向全局DeleteSlice的尾结点。注意,我前面说了,DeleteSlice的头结点都是一个哨兵节点,因此当前全局DeleteSlice的尾结点只是私有DeleteSlice的哨兵节点。
通过IndexWriter删除文档
当通过IndexWriter删除文档时,把删除条件加入全局DeleteSlice,注意DWPT1的私有DeleteSlice的tail也会更新,这里我们简化,当做立即也更新,实际上更新的时间点有3个:新增,更新或者flush。还有一个需要注意的是,此时新增的删除条件对于DWPT1中doc的作用范围,就是目前的最大的docID。
通过IndexWriter更新文档
当通过IndexWriter更新文档时,假设这时候IndexWriter还是通过DWPT1来进行更新,DWPT1会新增文档,然后把删除条件加入全局DeleteSlice,更新私有DeleteSlice的tail。
通过IndexWriter新增文档
当通过IndexWriter新增文档时,假设这时候IndexWriter又创建了一个DWPT2来处理,DWPT2的私有DeleteSlice的head和tail指向了当前全局DeleteSlice的尾结点。
通过IndexWriter更新文档
到这一步我也不用分析了,相信大家都已经比较清楚了。
假设到这一步了,现在要进行全局flush了,则全局DeleteSlice的所有删除条件都作用于segment0和segment1。DWPT1和DWPT2私有的DeleteSlice只作用于DWPT1和DWPT2各自即将生成的segment。
至于删除条件怎么作用的,怎么持久化删除信息的,我们后面的源码中再介绍。
相关知识
BufferedUpdates
BufferedUpdates是存储未持久化的删除和更新数据,其实就是个缓存。Lucene支持根据term删除文档,也支持根据query删除文档,还支持DocValues的更新,BufferedUpdates就是用来存储这些更新和删除的信息,在flush的时候会转成FrozenBufferedUpdates,最终进行持久化。
成员变量
BufferedUpdates的核心成员变量,主要是几个用来进行缓存的map:
// 根据term匹配条件删除的个数
final AtomicInteger numTermDeletes = new AtomicInteger();
// docValue更新的个数
final AtomicInteger numFieldUpdates = new AtomicInteger();
// key是删除的term条件,value是生效的docId上界
final Map<Term, Integer> deleteTerms = new HashMap<>();
// key是删除的query条件,value是生效的docId上界
final Map<Query, Integer> deleteQueries = new HashMap<>();
// key是字段名称,value是对应字段的docValue的更新信息
final Map<String, FieldUpdatesBuffer> fieldUpdates = new HashMap<>();
核心方法
主要是一些把相关的删除和更新的信息加入对应的缓存中。
// 根据query删除
public void addQuery(Query query, int docIDUpto) {
Integer current = deleteQueries.put(query, docIDUpto);
// 如果是新增的,则需要更新占用的内存大小
if (current == null) {
bytesUsed.addAndGet(BYTES_PER_DEL_QUERY);
}
}
// 根据term删除
public void addTerm(Term term, int docIDUpto) {
Integer current = deleteTerms.get(term);
if (current != null && docIDUpto < current) { // 如果已经有了按term删除的,并且有效的docID上限更大,则忽略当前的删除动作
return;
}
deleteTerms.put(term, Integer.valueOf(docIDUpto));
numTermDeletes.incrementAndGet();
if (current == null) { // 新增的需要更新内存占用信息
termsBytesUsed.addAndGet(
BYTES_PER_DEL_TERM + term.bytes.length + (Character.BYTES * term.field().length()));
}
}
// NumericDocValues的更新
void addNumericUpdate(NumericDocValuesUpdate update, int docIDUpto) {
// 获取字段的 FieldUpdatesBuffer ,如果是这个字段第一次更新docValues,则创建一个
FieldUpdatesBuffer buffer =
fieldUpdates.computeIfAbsent(
update.field, k -> new FieldUpdatesBuffer(fieldUpdatesBytesUsed, update, docIDUpto));
if (update.hasValue) {
buffer.addUpdate(update.term, update.getValue(), docIDUpto);
} else {
buffer.addNoValue(update.term, docIDUpto);
}
numFieldUpdates.incrementAndGet();
}
// BinaryDocValues的更新
void addBinaryUpdate(BinaryDocValuesUpdate update, int docIDUpto) {
// 获取字段的 FieldUpdatesBuffer ,如果是这个字段第一次更新docValues,则创建一个
FieldUpdatesBuffer buffer =
fieldUpdates.computeIfAbsent(
update.field, k -> new FieldUpdatesBuffer(fieldUpdatesBytesUsed, update, docIDUpto));
if (update.hasValue) {
buffer.addUpdate(update.term, update.getValue(), docIDUpto);
} else {
buffer.addNoValue(update.term, docIDUpto);
}
numFieldUpdates.incrementAndGet();
}
FrozenBufferedUpdates
FrozenBufferedUpdates和BufferedUpdates最大的区别有两个:
- FrozenBufferedUpdates不支持新增了,所以把删除信息用更高效的的存储结构进行存储
- BufferedUpdates中存储删除信息还只是原始条件,但是FrozenBufferedUpdates中提供了把条件转成匹配docID的方法
成员变量
// 存储所有的term条件,term按序存储
final PrefixCodedTerms deleteTerms;
// 根据query删除的信息
final Query[] deleteQueries;
final int[] deleteQueryLimits;
// 用来判断是否所有删除和更新信息都完成apply了
public final CountDownLatch applied = new CountDownLatch(1);
// 只能由一个线程进行apply
private final ReentrantLock applyLock = new ReentrantLock();
// docValues的更新信息
private final Map<String, FieldUpdatesBuffer> fieldUpdates;
// 更新和删除的总数
public long totalDelCount;
// docValues更新总数
private final int fieldUpdatesCount;
// term删除总数
final int numTermDeletes;
// 这个是用来判断当前FrozenBufferedUpdates作用的segment的范围,segment中也有一个bufferedDeletesGen
// FrozenBufferedUpdates起作用的segment,是bufferedDeletesGen >= delGen
private long delGen = -1;
// 如果FrozenBufferedUpdates内的更新和删除信息都是一个段私有的,则privateSegment就是这个段的信息
// 如果是段私有的,则只有根据query删除,和docValues更新的信息,因为根据term删除在org.apache.lucene.index.FreqProxTermsWriter#applyDeletes已经处理了
final SegmentCommitInfo privateSegment;
核心方法
构造函数
在构造函数中主要做了以下几件事:
- 把term删除条件中的term排序之后使用前缀编码的数据进行压缩存储
- 把map结构的query删除信息转成数组存储,因为map中可能是有闲置空间的
- 执行FieldUpdatesBuffer的finish方法
public FrozenBufferedUpdates(
InfoStream infoStream, BufferedUpdates updates, SegmentCommitInfo privateSegment) {
this.infoStream = infoStream;
this.privateSegment = privateSegment;
// 获取所有根据term删除的的term集合
Term[] termsArray = updates.deleteTerms.keySet().toArray(new Term[updates.deleteTerms.size()]);
// 对term进行排序,先按field排序,再按term value排序
ArrayUtil.timSort(termsArray);
// 从名字可以看出,就是前缀编码,也就是对同一个field的term的value进行前缀编码
PrefixCodedTerms.Builder builder = new PrefixCodedTerms.Builder();
for (Term term : termsArray) {
builder.add(term);
}
deleteTerms = builder.finish();
// 把 BufferedUpdates 中的deleteQueries转成数组存储
// 因为我们知道map的大小有可能是有浪费的,转成数组就是按需使用空间
deleteQueries = new Query[updates.deleteQueries.size()];
deleteQueryLimits = new int[updates.deleteQueries.size()];
int upto = 0;
for (Map.Entry<Query, Integer> ent : updates.deleteQueries.entrySet()) {
deleteQueries[upto] = ent.getKey();
deleteQueryLimits[upto] = ent.getValue();
upto++;
}
// 在flush之前,所有的FieldUpdatesBuffer需要进行finish
updates.fieldUpdates.values().forEach(FieldUpdatesBuffer::finish);
this.fieldUpdates = Map.copyOf(updates.fieldUpdates);
this.fieldUpdatesCount = updates.numFieldUpdates.get();
bytesUsed =
(int)
((deleteTerms.ramBytesUsed() + deleteQueries.length * BYTES_PER_DEL_QUERY)
+ updates.fieldUpdatesBytesUsed.get());
numTermDeletes = updates.numTermDeletes.get();
}
apply所有的更新和删除
真正apply所有更新和删除信息的入口,在里面分别执行了term删除,query删除和DocValues更新。
long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
// FrozenBufferedUpdates是加入BufferedUpdatesStream中,等待apply
// BufferedUpdatesStream为FrozenBufferedUpdates分配delGen,确定apply生效的segment
if (delGen == -1) {
throw new IllegalArgumentException(
"gen is not yet set; call BufferedUpdatesStream.push first");
}
totalDelCount += applyTermDeletes(segStates);
totalDelCount += applyQueryDeletes(segStates);
totalDelCount += applyDocValuesUpdates(segStates);
return totalDelCount;
}
apply根据term的删除
这个方法主要是对已经存在的segment的处理。如果是还未flush的段的term删除,在flush的时候就已经被处理了,这部分逻辑在org.apache.lucene.index.FreqProxTermsWriter#applyDeletes。
apply term删除条件,就是遍历所有的segment,读取segment的对应term的倒排信息,获取docID列表,和term删除条件的docUpTo对比,满足条件的就删除。删除的方式就是把docID记录到segment对应的ReadersAndUpdates的pendingDeletes中,最后持久化的时候会生成liv文件。
private long applyTermDeletes(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
if (deleteTerms.size() == 0) {
return 0;
}
// 如果deleteTerms非空,则说明这些删除信息不是段私有的
assert privateSegment == null;
long startNS = System.nanoTime();
long delCount = 0;
// 遍历所有的段
for (BufferedUpdatesStream.SegmentState segState : segStates) {
// 不属于删除的范围
if (segState.delGen > delGen) {
continue;
}
if (segState.rld.refCount() == 1) { // merge的情况,以后介绍
continue;
}
FieldTermIterator iter = deleteTerms.iterator();
BytesRef delTerm;
TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, true);
while ((delTerm = iter.next()) != null) {
// 获取term的倒排信息,得到docID列表
final DocIdSetIterator iterator = termDocsIterator.nextTerm(iter.field(), delTerm);
if (iterator != null) {
int docID;
// 删除所有的匹配的文档
while ((docID = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
// rld是 ReadersAndUpdates,其中有个pendingDeletes用来记录存活的docID
if (segState.rld.delete(docID)) {
delCount++;
}
}
}
}
}
return delCount;
}
apply根据query的删除
query删除也是遍历所有的segment,然后执行query查询找到匹配的docID,和query删除条件的docUpTo对比,满足条件的就删除,删除的方式和term删除一样。
这里涉及到一些query查询的组件,我们以后再详细介绍。
private long applyQueryDeletes(BufferedUpdatesStream.SegmentState[] segStates)
throws IOException {
if (deleteQueries.length == 0) {
return 0;
}
long startNS = System.nanoTime();
long delCount = 0;
for (BufferedUpdatesStream.SegmentState segState : segStates) {
if (delGen < segState.delGen) {
continue;
}
if (segState.rld.refCount() == 1) { // merge的情况,以后介绍
continue;
}
final LeafReaderContext readerContext = segState.reader.getContext();
for (int i = 0; i < deleteQueries.length; i++) {
Query query = deleteQueries[i];
int limit;
if (delGen == segState.delGen) {
limit = deleteQueryLimits[i];
} else {
limit = Integer.MAX_VALUE;
}
final IndexSearcher searcher = new IndexSearcher(readerContext.reader());
searcher.setQueryCache(null);
query = searcher.rewrite(query);
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1);
final Scorer scorer = weight.scorer(readerContext);
if (scorer != null) {
final DocIdSetIterator it = scorer.iterator();
if (segState.rld.sortMap != null && limit != Integer.MAX_VALUE) {
int docID;
while ((docID = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (segState.rld.sortMap.newToOld(docID) < limit) {
if (segState.rld.delete(docID)) {
delCount++;
}
}
}
} else {
int docID;
while ((docID = it.nextDoc()) < limit) {
if (segState.rld.delete(docID)) {
delCount++;
}
}
}
}
}
}
return delCount;
}
appy所有DocValues的更新
private long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState[] segStates)
throws IOException {
if (fieldUpdates.isEmpty()) {
return 0;
}
long startNS = System.nanoTime();
long updateCount = 0;
for (BufferedUpdatesStream.SegmentState segState : segStates) {
if (delGen < segState.delGen) {
continue;
}
if (segState.rld.refCount() == 1) {
continue;
}
final boolean isSegmentPrivateDeletes = privateSegment != null;
if (fieldUpdates.isEmpty() == false) {
updateCount +=
applyDocValuesUpdates(segState, fieldUpdates, delGen, isSegmentPrivateDeletes);
}
}
return updateCount;
}
private static long applyDocValuesUpdates(
BufferedUpdatesStream.SegmentState segState,
Map<String, FieldUpdatesBuffer> updates,
long delGen,
boolean segmentPrivateDeletes)
throws IOException {
long updateCount = 0;
final List<DocValuesFieldUpdates> resolvedUpdates = new ArrayList<>();
for (Map.Entry<String, FieldUpdatesBuffer> fieldUpdate : updates.entrySet()) {
String updateField = fieldUpdate.getKey();
DocValuesFieldUpdates dvUpdates = null;
FieldUpdatesBuffer value = fieldUpdate.getValue();
boolean isNumeric = value.isNumeric();
FieldUpdatesBuffer.BufferedUpdateIterator iterator = value.iterator();
FieldUpdatesBuffer.BufferedUpdate bufferedUpdate;
TermDocsIterator termDocsIterator =
new TermDocsIterator(segState.reader, iterator.isSortedTerms());
while ((bufferedUpdate = iterator.next()) != null) {
final DocIdSetIterator docIdSetIterator =
termDocsIterator.nextTerm(bufferedUpdate.termField, bufferedUpdate.termValue);
if (docIdSetIterator != null) {
final int limit;
if (delGen == segState.delGen) {
limit = bufferedUpdate.docUpTo;
} else {
limit = Integer.MAX_VALUE;
}
final BytesRef binaryValue;
final long longValue;
if (bufferedUpdate.hasValue == false) {
longValue = -1;
binaryValue = null;
} else {
longValue = bufferedUpdate.numericValue;
binaryValue = bufferedUpdate.binaryValue;
}
if (dvUpdates == null) {
if (isNumeric) {
if (value.hasSingleValue()) {
dvUpdates =
new NumericDocValuesFieldUpdates.SingleValueNumericDocValuesFieldUpdates(
delGen, updateField, segState.reader.maxDoc(), value.getNumericValue(0));
} else {
dvUpdates =
new NumericDocValuesFieldUpdates(
delGen,
updateField,
value.getMinNumeric(),
value.getMaxNumeric(),
segState.reader.maxDoc());
}
} else {
dvUpdates =
new BinaryDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc());
}
resolvedUpdates.add(dvUpdates);
}
final IntConsumer docIdConsumer;
final DocValuesFieldUpdates update = dvUpdates;
if (bufferedUpdate.hasValue == false) {
docIdConsumer = doc -> update.reset(doc);
} else if (isNumeric) {
docIdConsumer = doc -> update.add(doc, longValue);
} else {
docIdConsumer = doc -> update.add(doc, binaryValue);
}
final Bits acceptDocs = segState.rld.getLiveDocs();
if (segState.rld.sortMap != null && segmentPrivateDeletes) {
// This segment was sorted on flush; we must apply seg-private deletes carefully in this
// case:
int doc;
while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (acceptDocs == null || acceptDocs.get(doc)) {
// The limit is in the pre-sorted doc space:
if (segState.rld.sortMap.newToOld(doc) < limit) {
docIdConsumer.accept(doc);
updateCount++;
}
}
}
} else {
int doc;
while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (doc >= limit) {
break; // no more docs that can be updated for this term
}
if (acceptDocs == null || acceptDocs.get(doc)) {
docIdConsumer.accept(doc);
updateCount++;
}
}
}
}
}
}
// now freeze & publish:
for (DocValuesFieldUpdates update : resolvedUpdates) {
if (update.any()) {
update.finish();
segState.rld.addDVUpdate(update);
}
}
return updateCount;
}
BufferedUpdatesStream
BufferedUpdatesStream用来存储所有的FrozenBufferedUpdates,所有的FrozenBufferedUpdates都需要加入BufferedUpdatesStream获取delGen,从而能在apply的时候确定作用的segment范围。
核心方法
添加一个FrozenBufferedUpdates
synchronized long push(FrozenBufferedUpdates packet) {
// 为FrozenBufferedUpdates分配delGen
packet.setDelGen(nextGen++);
updates.add(packet);
numTerms.addAndGet(packet.numTermDeletes);
bytesUsed.addAndGet(packet.bytesUsed);
return packet.delGen();
}
apply所有的FrozenBufferedUpdates
void waitApplyAll(IndexWriter writer) throws IOException {
Set<FrozenBufferedUpdates> waitFor;
synchronized (this) {
waitFor = new HashSet<>(updates);
}
waitApply(waitFor, writer);
}
private void waitApply(Set<FrozenBufferedUpdates> waitFor, IndexWriter writer)
throws IOException {
long startNS = System.nanoTime();
int packetCount = waitFor.size();
if (waitFor.isEmpty()) {
return;
}
ArrayList<FrozenBufferedUpdates> pendingPackets = new ArrayList<>();
long totalDelCount = 0;
for (FrozenBufferedUpdates packet : waitFor) {
if (writer.tryApply(packet) == false) {
pendingPackets.add(packet);
}
totalDelCount += packet.totalDelCount;
}
for (FrozenBufferedUpdates packet : pendingPackets) {
writer.forceApply(packet);
}
}
PendingDeletes
PendingDeletes是用来存储segment的删除信息的。
需要强调的是,PendingDeletes存储的是segment中所有的删除信息,包括已经持久化或者还未持久化的,所以liv文件的更新是全量的,最新的liv文件包含了旧的liv文件的删除信息,因此,如果是在KeepOnlyLastCommitDeletionPolicy索引删除策略的情况下,只会保留最新版本的liv文件。
成员变量
// 对应段的提交信息
protected final SegmentCommitInfo info;
// 只读的,如果段中所有文档都是存活的或者是未初始化则是null
private Bits liveDocs;
// 可写的用来记录存活的文档,有新的删除需要更新才进行初始化
private FixedBitSet writeableLiveDocs;
// 待删除的文档个数
protected int pendingDeleteCount;
// 是否初始化过
boolean liveDocsInitialized;
核心方法
新增一个删除的docID
getMutableBits中看到,writeableLiveDocs优先是基于liveDocs来进行初始化的,因此当前的删除信息总是包含了旧的删除信息。
// 删除文档
// 如果文档确实被删除了,则返回true
// 如果文档已经删除过了,则返回false
boolean delete(int docID) throws IOException {
// 获取可写的存活文档位图记录器
FixedBitSet mutableBits = getMutableBits();
// 判断当前文档是不是存活的
final boolean didDelete = mutableBits.get(docID);
if (didDelete) { // 如果要删除的文档是存活的,则执行删除操作
mutableBits.clear(docID);
pendingDeleteCount++;
}
return didDelete;
}
// 获取writeableLiveDocs
protected FixedBitSet getMutableBits() {
// 如果writeableLiveDocs还没有被初始化
if (writeableLiveDocs == null) {
if (liveDocs != null) { // 如果段当前已经存在删除的文档,则直接拷贝一份
writeableLiveDocs = FixedBitSet.copyOf(liveDocs);
} else { // 如果当前段还没有文档被删除,则新建一个表示所有文档都存活的位图
writeableLiveDocs = new FixedBitSet(info.info.maxDoc());
writeableLiveDocs.set(0, info.info.maxDoc());
}
// 注意,这个方法使得 writeableLiveDocs 和 liveDocs底层存储信息的数组是同一个,
// 因此writeableLiveDocs的更新也会反映在liveDocs中
liveDocs = writeableLiveDocs.asReadOnlyBits();
}
return writeableLiveDocs;
}
生成liv文件
持久化的时候就是持久化当前的liveDocs,也就是最新的删除信息。
boolean writeLiveDocs(Directory dir) throws IOException {
if (pendingDeleteCount == 0) { // 没有待删除的文档
return false;
}
Bits liveDocs = this.liveDocs;
// 把Directory封装成TrackingDirectoryWrapper,是为了如果在生成liv文件的过程中,
// 如果发生任何异常,则把过程中生成的文件都删除
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
boolean success = false;
try {
Codec codec = info.info.getCodec();
codec
.liveDocsFormat()
.writeLiveDocs(liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT);
success = true;
} finally {
if (!success) {
// 如果落盘失败,则删除所有新生成的文件
for (String fileName : trackingDir.getCreatedFiles()) {
IOUtils.deleteFilesIgnoringExceptions(dir, fileName);
}
}
}
// 落盘成功,pendingDeleteCount 设为0,表示当前没有未落盘的删除信息,跟方法开头的判断先呼应
dropChanges();
return true;
}
FieldUpdatesBuffer
docvalues的更新都是通过term查询来获取更新的doc集合。
工具类
DocValuesUpdate
DocValuesUpdate封装了一次DocValues更新的全部信息:
// DocValues的类型,目前支持更新的只有两种:NUMERIC和BINARY
final DocValuesType type;
// 查找更新文档的term条件
final Term term;
// 要更新的字段
final String field;
// 当前更新范围的docID上界
final int docIDUpTo;
// 是否是有值更新,如果是无值的,相当于把旧值清除
final boolean hasValue;
成员变量
// 记录当前FieldUpdatesBuffer的内存占用
private final Counter bytesUsed;
// 记录第几次的更新操作
private int numUpdates = 1;
// 存储更新的term条件中的value,把它当成一个数组
private final BytesRefArray termValues;
// 可以获取按照指定的排序规则排好序的下标列表
private BytesRefArray.SortState termSortState;
// 存储binary docvalues更新的值
private final BytesRefArray byteValues;
// 每次更新操作对应的docID上限
private int[] docsUpTo;
// 存储numeric docvalues更新的值
private long[] numericValues;
// 标记对应的更新是否有值,如果是无值更新,相当于把满足term条件的指定的字段的docvalue清空
private FixedBitSet hasValues;
// 记录最大值和最小值是为了优化存储空间,详细可以看之前关于numericDocValues的介绍
private long maxNumeric = Long.MIN_VALUE;
private long minNumeric = Long.MAX_VALUE;
// 每次更新对应的字段
private String[] fields;
// 是否是对NumericDocValues的更新
private final boolean isNumeric;
// 是否已经结束了,等待flush
private boolean finished = false;
核心方法
构造方法
这里需要注意的是,FieldUpdatesBuffer是一个字段一个的,FieldUpdatesBuffer会以字段的第一次更新的信息来进行初始化,也就是构造函数中的initialValue。fields,docsUpTo都会以initialValue中的对应信息作为数组的第一个元素,之后采用延迟的扩容方式,直到与第一个元素不相同的更新出现,才会把所有的元素都补齐。如果initialValue是有值更新,则记录是否存在值的hasValues不初始化,它是延迟初始化,直到碰到一个无值更新为止,才进行初始化。
这里可能让大家云里雾里,后面介绍新增更新的核心方法的时候就理解了,这里只留个印象就好。
private FieldUpdatesBuffer(
Counter bytesUsed, DocValuesUpdate initialValue, int docUpTo, boolean isNumeric) {
this.bytesUsed = bytesUsed;
this.bytesUsed.addAndGet(SELF_SHALLOW_SIZE);
termValues = new BytesRefArray(bytesUsed);
termValues.append(initialValue.term.bytes);
fields = new String[] {initialValue.term.field};
bytesUsed.addAndGet(sizeOfString(initialValue.term.field));
docsUpTo = new int[] {docUpTo};
// 如果是无值更新,才进行 hasValues 的初始化
if (initialValue.hasValue == false) {
hasValues = new FixedBitSet(1);
bytesUsed.addAndGet(hasValues.ramBytesUsed());
}
this.isNumeric = isNumeric;
byteValues = isNumeric ? null : new BytesRefArray(bytesUsed);
}
// NumericDocValues更新的构造函数
FieldUpdatesBuffer(
Counter bytesUsed, DocValuesUpdate.NumericDocValuesUpdate initialValue, int docUpTo) {
this(bytesUsed, initialValue, docUpTo, true);
if (initialValue.hasValue()) {
numericValues = new long[] {initialValue.getValue()};
maxNumeric = minNumeric = initialValue.getValue();
} else { // 无值更新默认是0
numericValues = new long[] {0};
}
bytesUsed.addAndGet(Long.BYTES);
}
// BinaryDocValues更新的构造函数
FieldUpdatesBuffer(
Counter bytesUsed, DocValuesUpdate.BinaryDocValuesUpdate initialValue, int docUpTo) {
this(bytesUsed, initialValue, docUpTo, false);
if (initialValue.hasValue()) {
byteValues.append(initialValue.getValue());
}
}
记录更新的信息
void add(String field, int docUpTo, int ord, boolean hasValue) {
// fields数组判断是否需要扩容,2个条件:
// 1.当前field和第一个field不一样
// 2.已经扩容过一次了,也就是fields.length != 1
if (fields[0].equals(field) == false || fields.length != 1) {
if (fields.length <= ord) {
String[] array = ArrayUtil.grow(fields, ord + 1);
if (fields.length == 1) { // 如果是第一次扩容则从下标1到ord的值和fields[0]相同
Arrays.fill(array, 1, ord, fields[0]);
}
bytesUsed.addAndGet(
(array.length - fields.length) * RamUsageEstimator.NUM_BYTES_OBJECT_REF);
fields = array;
}
if (field != fields[0]) {
bytesUsed.addAndGet(sizeOfString(field));
}
fields[ord] = field;
}
// docsUpTo数组判断是否需要扩容,2个条件:
// 1.当前docUpTo和第一个docUpTo不一样
// 2.已经扩容过一次了,也就是docsUpTo.length != 1
if (docsUpTo[0] != docUpTo || docsUpTo.length != 1) {
if (docsUpTo.length <= ord) {
int[] array = ArrayUtil.grow(docsUpTo, ord + 1);
if (docsUpTo.length == 1) { // 如果是第一次扩容则从下标1到ord的值和docsUpTo[0]相同
Arrays.fill(array, 1, ord, docsUpTo[0]);
}
bytesUsed.addAndGet((array.length - docsUpTo.length) * Integer.BYTES);
docsUpTo = array;
}
docsUpTo[ord] = docUpTo;
}
// hasValues判断是否需要扩容,2个条件:
// 1.当前的更新是无值更新
// 2.hasValues已经初始化过了
if (hasValue == false || hasValues != null) {
if (hasValues == null) { // 第一次初始化,说明碰到的是第一个无值更新的,从0到ord都是有值更新
hasValues = new FixedBitSet(ord + 1);
hasValues.set(0, ord);
bytesUsed.addAndGet(hasValues.ramBytesUsed());
} else if (hasValues.length() <= ord) {
FixedBitSet fixedBitSet =
FixedBitSet.ensureCapacity(hasValues, ArrayUtil.oversize(ord + 1, 1));
bytesUsed.addAndGet(fixedBitSet.ramBytesUsed() - hasValues.ramBytesUsed());
hasValues = fixedBitSet;
}
if (hasValue) { // 按实际情况进行设置
hasValues.set(ord);
}
}
}
记录新的条件
private int append(Term term) {
termValues.append(term.bytes);
return numUpdates++;
}
新增无值更新
void addNoValue(Term term, int docUpTo) {
final int ord = append(term);
add(term.field, docUpTo, ord, false);
}
新增有值的NumericDocValues更新
void addUpdate(Term term, long value, int docUpTo) {
// 记录更新的条件,并获取当前是第几次更新
final int ord = append(term);
String field = term.field;
add(field, docUpTo, ord, true);
minNumeric = Math.min(minNumeric, value);
maxNumeric = Math.max(maxNumeric, value);
// numericValues数组判断是否需要扩容,2个条件:
// 1.当前value和第一个value不一样
// 2.已经扩容过一次了,也就是numericValues.length != 1
if (numericValues[0] != value || numericValues.length != 1) {
if (numericValues.length <= ord) {
long[] array = ArrayUtil.grow(numericValues, ord + 1);
if (numericValues.length == 1) { // 如果是第一次扩容则从下标1到ord的值和numericValues[0]相同
Arrays.fill(array, 1, ord, numericValues[0]);
}
bytesUsed.addAndGet((array.length - numericValues.length) * Long.BYTES);
numericValues = array;
}
numericValues[ord] = value;
}
}
新增有值的BinaryDocValues更新
void addUpdate(Term term, BytesRef value, int docUpTo) {
final int ord = append(term);
byteValues.append(value);
add(term.field, docUpTo, ord, true);
}
结束添加
结束添加的时候处理了一种针对NumericDocValues更新的特殊情况,同时满足以下三个条件:
- 更新的值都相等
- 所有的更新都是有值更新
- 所有的term条件是同一个字段
针对这种特殊情况,Lcuene对所有缓存的待更新的信息进行了排序,先按term条件的值升序排序,再按docsUpTo降序,这样在处理的时候可以忽略一部分的更新,我们举个例子。
假设有以下更新列表:
term | docsUpTo |
---|---|
d | 3 |
c | 4 |
a | 5 |
d | 9 |
a | 1 |
经过排序之后,得到:
term | docsUpTo |
---|---|
a | 5 |
a | 1 |
c | 4 |
d | 9 |
d | 3 |
按序处理上面的列表,我们知道(a,5)已经包含了(a,1),所以(a,1)不用处理了。同理,(d,9)已经包含了(d,3),所以(d,3)不用处理了。
void finish() {
if (finished) {
throw new IllegalStateException("buffer was finished already");
}
finished = true;
// 如果是对NumericDocValues更新,并且更新的值都相同,并且都是有值更新,并且term条件是同一个字段,则需要排序
final boolean sortedTerms = hasSingleValue() && hasValues == null && fields.length == 1;
if (sortedTerms) {
// 先按term升序排序,再按docsUpTo降序
termSortState =
termValues.sort(
Comparator.naturalOrder(),
(i1, i2) ->
Integer.compare(
docsUpTo[getArrayIndex(docsUpTo.length, i2)],
docsUpTo[getArrayIndex(docsUpTo.length, i1)]));
bytesUsed.addAndGet(termSortState.ramBytesUsed());
}
}
DocumentsWriterDeleteQueue
内部类
DeleteSlice
完整的删除链表是由多个线程共享的,每个线程自己的删除链表就是一个slice。这个每个线程就对应了一个
static class DeleteSlice {
Node<?> sliceHead; // 哨兵头节点,该节点的信息不会被使用
Node<?> sliceTail;
DeleteSlice(Node<?> currentTail) {
sliceHead = sliceTail = currentTail;
}
void apply(BufferedUpdates del, int docIDUpto) {
if (sliceHead == sliceTail) {
return;
}
Node<?> current = sliceHead;
do {
current = current.next;
// 把删除信息加入del缓存中
current.apply(del, docIDUpto);
} while (current != sliceTail);
reset();
}
void reset() {
sliceHead = sliceTail;
}
/**
* Returns <code>true</code> iff the given node is identical to the slices tail, otherwise
* <code>false</code>.
*/
boolean isTail(Node<?> node) {
return sliceTail == node;
}
/**
* Returns <code>true</code> iff the given item is identical to the item hold by the slices
* tail, otherwise <code>false</code>.
*/
boolean isTailItem(Object object) {
return sliceTail.item == object;
}
boolean isEmpty() {
return sliceHead == sliceTail;
}
}
节点Node
static class Node<T> {
volatile Node<?> next;
// 节点的信息
final T item;
Node(T item) {
this.item = item;
}
void apply(BufferedUpdates bufferedDeletes, int docIDUpto) {
throw new IllegalStateException("sentinel item must never be applied");
}
boolean isDelete() {
return true;
}
}
private static final class TermNode extends Node<Term> {
TermNode(Term term) {
super(term);
}
@Override
void apply(BufferedUpdates bufferedDeletes, int docIDUpto) {
bufferedDeletes.addTerm(item, docIDUpto);
}
@Override
public String toString() {
return "del=" + item;
}
}
private static final class QueryArrayNode extends Node<Query[]> {
QueryArrayNode(Query[] query) {
super(query);
}
@Override
void apply(BufferedUpdates bufferedUpdates, int docIDUpto) {
for (Query query : item) {
bufferedUpdates.addQuery(query, docIDUpto);
}
}
}
private static final class TermArrayNode extends Node<Term[]> {
TermArrayNode(Term[] term) {
super(term);
}
@Override
void apply(BufferedUpdates bufferedUpdates, int docIDUpto) {
for (Term term : item) {
bufferedUpdates.addTerm(term, docIDUpto);
}
}
@Override
public String toString() {
return "dels=" + Arrays.toString(item);
}
}
private static final class DocValuesUpdatesNode extends Node<DocValuesUpdate[]> {
DocValuesUpdatesNode(DocValuesUpdate... updates) {
super(updates);
}
// 只支持numeric和binary docValues的更新
void apply(BufferedUpdates bufferedUpdates, int docIDUpto) {
for (DocValuesUpdate update : item) {
switch (update.type) {
case NUMERIC:
bufferedUpdates.addNumericUpdate((NumericDocValuesUpdate) update, docIDUpto);
break;
case BINARY:
bufferedUpdates.addBinaryUpdate((BinaryDocValuesUpdate) update, docIDUpto);
break;
case NONE:
case SORTED:
case SORTED_SET:
case SORTED_NUMERIC:
default:
throw new IllegalArgumentException(
update.type + " DocValues updates not supported yet!");
}
}
}
@Override
boolean isDelete() {
return false;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("docValuesUpdates: ");
if (item.length > 0) {
sb.append("term=").append(item[0].term).append("; updates: [");
for (DocValuesUpdate update : item) {
sb.append(update.field).append(':').append(update.valueToString()).append(',');
}
sb.setCharAt(sb.length() - 1, ']');
}
return sb.toString();
}
}
关键成员变量
// 在flush之前,所有的DocumentsWriterPerThread是共享的
private volatile Node<?> tail;
// 全局的删除链表,作用范围是当前所有的segment。
private final DeleteSlice globalSlice;
// flush的时候,globalSlice中所有的数据已经在globalBufferedUpdates中
private final BufferedUpdates globalBufferedUpdates;
核心方法
新增一个节点
注意是同步的,因为DocumentsWriterDeleteQueue是全局共享的。
synchronized long add(Node<?> newNode) {
ensureOpen();
tail.next = newNode;
this.tail = newNode;
return getNextSequenceNumber();
}
新增更新和删除节点
long addDelete(Query... queries) {
long seqNo = add(new QueryArrayNode(queries));
tryApplyGlobalSlice();
return seqNo;
}
long addDelete(Term... terms) {
long seqNo = add(new TermArrayNode(terms));
tryApplyGlobalSlice();
return seqNo;
}
long addDocValuesUpdates(DocValuesUpdate... updates) {
long seqNo = add(new DocValuesUpdatesNode(updates));
tryApplyGlobalSlice();
return seqNo;
}
long add(Node<?> deleteNode, DeleteSlice slice) {
long seqNo = add(deleteNode);
slice.sliceTail = deleteNode;
tryApplyGlobalSlice();
return seqNo;
}
把gloablSlice的数据apply到globalBufferedUpdates
void tryApplyGlobalSlice() {
if (globalBufferLock.tryLock()) {
ensureOpen();
try {
if (updateSliceNoSeqNo(globalSlice)) {// 如果globalSlice有新数据,则把数据都更新到globalBufferedUpdates
globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
}
} finally {
globalBufferLock.unlock();
}
}
}
把globalBufferedUpdates转成FrozenBufferedUpdates
FrozenBufferedUpdates maybeFreezeGlobalBuffer() {
globalBufferLock.lock();
try {
if (closed == false) {
return freezeGlobalBufferInternal(tail);
} else {
return null;
}
} finally {
globalBufferLock.unlock();
}
}
private FrozenBufferedUpdates freezeGlobalBufferInternal(final Node<?> currentTail) {
if (globalSlice.sliceTail != currentTail) { // global中还有数据没有apply到globalBufferedUpdates
globalSlice.sliceTail = currentTail;
globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
}
if (globalBufferedUpdates.any()) { // globalBufferedUpdates中有数据
final FrozenBufferedUpdates packet =
new FrozenBufferedUpdates(infoStream, globalBufferedUpdates, null);
globalBufferedUpdates.clear();
return packet;
} else {
return null;
}
}
更新指定的DeleteSlice
// 参数slice就是DocumentsWriterPerThread中私有的DeleteSlice
synchronized long updateSlice(DeleteSlice slice) {
ensureOpen();
long seqNo = getNextSequenceNumber();
if (slice.sliceTail != tail) {
slice.sliceTail = tail;
// 如果有更新,则返回的 seqNo 是负数
seqNo = -seqNo;
}
return seqNo;
}
boolean updateSliceNoSeqNo(DeleteSlice slice) {
if (slice.sliceTail != tail) {
slice.sliceTail = tail;
return true;
}
return false;
}
详细流程
后文介绍IndexWriter的增删改时全流程解析。