Lucene源码系列(三十三):文档删除和DocValues更新

461 阅读11分钟

背景

Lucene中删除文档主要有两种途径:

  • 直接删除:直接根据term或者query条件进行删除
    • IndexWriter#deleteDocuments(Query...)
    • IndexWriter#deleteDocuments(Term...)
  • 间接删除:lucene中更新文档,是先添加新文档,再删除满足term条件的文档
    • IndexWriter#updateDocument(Term, doc)
    • IndexWriter#updateDocuments(Term, docs)

文档删除的过程其实比较复杂,需要解决的问题有:

  • 怎么确定删除条件的作用范围?范围又分为两种,一个是作用在哪些segment,另一个是作用在segment中的doc范围。
  • 已经flush的segment和还未flush的segment删除逻辑分别怎么处理?
  • 删除结果的持久化时机?持久化的时间点是不是和可见的时间点相同?

Lucene中还有一种操作,和文档删除的处理逻辑很相似:DocValues的更新。这个我们本文一并介绍。

确定删除的范围

删除范围纯属设计层面的东西,不需要详细了解源码也可以大概来说清楚。因此,我们先看下删除的范围是怎么确定的?

先简单介绍几个相关的组件,详细的介绍见后文:

  • DeleteSlice:一个链表,链表中的节点都是删除条件,不同的删除条件是不同的Node,比如根据term删除的TermNode,根据query删除的QueryNode等。注意:所有DeleteSlice头结点指向的节点是个哨兵节点。
  • IndexWriter:索引管理的入口,提供了新增文档,删除文档,更新文档等接口,新增文档或者更新文档的时候会选择一个DocumentsWriterPerThread来执行,没有的话会创建。IndexWriter持有全局的DeleteSlice,这个队列中的删除条件是作用于所有已经存在的segment中的所有doc。
  • DocumentsWriterPerThread:对应了一个未flush的segment,DocumentsWriterPerThread维护了一个私有的DeleteSlice队列,其实是全局DeleteSlice中的一段,这个私有DeleteSlice中所有的删除条件作用范围就是DocumentsWriterPerThread对应的未flush的segment,会在flush的时候处理。而删除条件作用的doc范围就是删除条件加入队列时候DocumentsWriterPerThread中已有的doc。

下面我举个例子,需要注意的是为了方便描述队列的变化过程,我简化了队列的维护逻辑。

初始状态

文档删除1.png

如上图所示,初始状态下,也就是初始化IndexWriter,假设我们现在是重新打开一个索引,索引中已经有两个segment。当前的全局DeleteSlice是个空链表,但是不管全局DeleteSlice怎么变,全局DeleteSlice中的所有删除条件作用范围都包括segment0和segment1。

通过IndexWriter删除文档

文档删除2.png

如上图所示,当通过IndexWriter的删除文档接口删除文档时,会把删除的条件加入到全局DeleteSlice。

通过IndexWriter新增文档

文档删除3.png

当通过IndexWriter的新增文档的时候,它会创建一个DWPT1来处理,DWPT1的私有DeleteSlice头尾都指向全局DeleteSlice的尾结点。注意,我前面说了,DeleteSlice的头结点都是一个哨兵节点,因此当前全局DeleteSlice的尾结点只是私有DeleteSlice的哨兵节点。

通过IndexWriter删除文档

文档删除4.png

当通过IndexWriter删除文档时,把删除条件加入全局DeleteSlice,注意DWPT1的私有DeleteSlice的tail也会更新,这里我们简化,当做立即也更新,实际上更新的时间点有3个:新增,更新或者flush。还有一个需要注意的是,此时新增的删除条件对于DWPT1中doc的作用范围,就是目前的最大的docID。

通过IndexWriter更新文档

文档删除5.png

当通过IndexWriter更新文档时,假设这时候IndexWriter还是通过DWPT1来进行更新,DWPT1会新增文档,然后把删除条件加入全局DeleteSlice,更新私有DeleteSlice的tail。

通过IndexWriter新增文档

文档删除6.png

当通过IndexWriter新增文档时,假设这时候IndexWriter又创建了一个DWPT2来处理,DWPT2的私有DeleteSlice的head和tail指向了当前全局DeleteSlice的尾结点。

通过IndexWriter更新文档

文档删除7.png

到这一步我也不用分析了,相信大家都已经比较清楚了。

假设到这一步了,现在要进行全局flush了,则全局DeleteSlice的所有删除条件都作用于segment0和segment1。DWPT1和DWPT2私有的DeleteSlice只作用于DWPT1和DWPT2各自即将生成的segment。

至于删除条件怎么作用的,怎么持久化删除信息的,我们后面的源码中再介绍。

相关知识

BufferedUpdates

BufferedUpdates是存储未持久化的删除和更新数据,其实就是个缓存。Lucene支持根据term删除文档,也支持根据query删除文档,还支持DocValues的更新,BufferedUpdates就是用来存储这些更新和删除的信息,在flush的时候会转成FrozenBufferedUpdates,最终进行持久化。

成员变量

BufferedUpdates的核心成员变量,主要是几个用来进行缓存的map:

// 根据term匹配条件删除的个数
final AtomicInteger numTermDeletes = new AtomicInteger();

// docValue更新的个数
final AtomicInteger numFieldUpdates = new AtomicInteger();

// key是删除的term条件,value是生效的docId上界
final Map<Term, Integer> deleteTerms = new HashMap<>();

// key是删除的query条件,value是生效的docId上界
final Map<Query, Integer> deleteQueries = new HashMap<>();

// key是字段名称,value是对应字段的docValue的更新信息
final Map<String, FieldUpdatesBuffer> fieldUpdates = new HashMap<>();

核心方法

主要是一些把相关的删除和更新的信息加入对应的缓存中。

// 根据query删除
public void addQuery(Query query, int docIDUpto) {
  Integer current = deleteQueries.put(query, docIDUpto);
  // 如果是新增的,则需要更新占用的内存大小
  if (current == null) {
    bytesUsed.addAndGet(BYTES_PER_DEL_QUERY);
  }
}

// 根据term删除
public void addTerm(Term term, int docIDUpto) {
  Integer current = deleteTerms.get(term);
  if (current != null && docIDUpto < current) { // 如果已经有了按term删除的,并且有效的docID上限更大,则忽略当前的删除动作
    return;
  }

  deleteTerms.put(term, Integer.valueOf(docIDUpto));
  numTermDeletes.incrementAndGet();
  if (current == null) { // 新增的需要更新内存占用信息
    termsBytesUsed.addAndGet(
        BYTES_PER_DEL_TERM + term.bytes.length + (Character.BYTES * term.field().length()));
  }
}

// NumericDocValues的更新
void addNumericUpdate(NumericDocValuesUpdate update, int docIDUpto) {
  // 获取字段的  FieldUpdatesBuffer ,如果是这个字段第一次更新docValues,则创建一个
  FieldUpdatesBuffer buffer =
      fieldUpdates.computeIfAbsent(
          update.field, k -> new FieldUpdatesBuffer(fieldUpdatesBytesUsed, update, docIDUpto));
  if (update.hasValue) {
    buffer.addUpdate(update.term, update.getValue(), docIDUpto);
  } else {
    buffer.addNoValue(update.term, docIDUpto);
  }
  numFieldUpdates.incrementAndGet();
}

// BinaryDocValues的更新
void addBinaryUpdate(BinaryDocValuesUpdate update, int docIDUpto) {
  // 获取字段的  FieldUpdatesBuffer ,如果是这个字段第一次更新docValues,则创建一个  
  FieldUpdatesBuffer buffer =
      fieldUpdates.computeIfAbsent(
          update.field, k -> new FieldUpdatesBuffer(fieldUpdatesBytesUsed, update, docIDUpto));
  if (update.hasValue) {
    buffer.addUpdate(update.term, update.getValue(), docIDUpto);
  } else {
    buffer.addNoValue(update.term, docIDUpto);
  }
  numFieldUpdates.incrementAndGet();
}

FrozenBufferedUpdates

FrozenBufferedUpdates和BufferedUpdates最大的区别有两个:

  • FrozenBufferedUpdates不支持新增了,所以把删除信息用更高效的的存储结构进行存储
  • BufferedUpdates中存储删除信息还只是原始条件,但是FrozenBufferedUpdates中提供了把条件转成匹配docID的方法

成员变量

// 存储所有的term条件,term按序存储
final PrefixCodedTerms deleteTerms;

// 根据query删除的信息
final Query[] deleteQueries;
final int[] deleteQueryLimits;

// 用来判断是否所有删除和更新信息都完成apply了
public final CountDownLatch applied = new CountDownLatch(1);
// 只能由一个线程进行apply
private final ReentrantLock applyLock = new ReentrantLock();

// docValues的更新信息
private final Map<String, FieldUpdatesBuffer> fieldUpdates;

// 更新和删除的总数
public long totalDelCount;
// docValues更新总数
private final int fieldUpdatesCount;
// term删除总数
final int numTermDeletes;

// 这个是用来判断当前FrozenBufferedUpdates作用的segment的范围,segment中也有一个bufferedDeletesGen
// FrozenBufferedUpdates起作用的segment,是bufferedDeletesGen >= delGen
private long delGen = -1;

// 如果FrozenBufferedUpdates内的更新和删除信息都是一个段私有的,则privateSegment就是这个段的信息
// 如果是段私有的,则只有根据query删除,和docValues更新的信息,因为根据term删除在org.apache.lucene.index.FreqProxTermsWriter#applyDeletes已经处理了
final SegmentCommitInfo privateSegment; 

核心方法

构造函数

在构造函数中主要做了以下几件事:

  1. 把term删除条件中的term排序之后使用前缀编码的数据进行压缩存储
  2. 把map结构的query删除信息转成数组存储,因为map中可能是有闲置空间的
  3. 执行FieldUpdatesBuffer的finish方法
public FrozenBufferedUpdates(
    InfoStream infoStream, BufferedUpdates updates, SegmentCommitInfo privateSegment) {
  this.infoStream = infoStream;
  this.privateSegment = privateSegment;
  // 获取所有根据term删除的的term集合
  Term[] termsArray = updates.deleteTerms.keySet().toArray(new Term[updates.deleteTerms.size()]);
  // 对term进行排序,先按field排序,再按term value排序
  ArrayUtil.timSort(termsArray);
  // 从名字可以看出,就是前缀编码,也就是对同一个field的term的value进行前缀编码
  PrefixCodedTerms.Builder builder = new PrefixCodedTerms.Builder();
  for (Term term : termsArray) {
    builder.add(term);
  }
  deleteTerms = builder.finish();

  // 把 BufferedUpdates 中的deleteQueries转成数组存储
  // 因为我们知道map的大小有可能是有浪费的,转成数组就是按需使用空间  
  deleteQueries = new Query[updates.deleteQueries.size()];
  deleteQueryLimits = new int[updates.deleteQueries.size()];
  int upto = 0;
  for (Map.Entry<Query, Integer> ent : updates.deleteQueries.entrySet()) {
    deleteQueries[upto] = ent.getKey();
    deleteQueryLimits[upto] = ent.getValue();
    upto++;
  }
  // 在flush之前,所有的FieldUpdatesBuffer需要进行finish
  updates.fieldUpdates.values().forEach(FieldUpdatesBuffer::finish);
  this.fieldUpdates = Map.copyOf(updates.fieldUpdates);
  this.fieldUpdatesCount = updates.numFieldUpdates.get();

  bytesUsed =
      (int)
          ((deleteTerms.ramBytesUsed() + deleteQueries.length * BYTES_PER_DEL_QUERY)
              + updates.fieldUpdatesBytesUsed.get());

  numTermDeletes = updates.numTermDeletes.get();
}
apply所有的更新和删除

真正apply所有更新和删除信息的入口,在里面分别执行了term删除,query删除和DocValues更新。

long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
  // FrozenBufferedUpdates是加入BufferedUpdatesStream中,等待apply
  // BufferedUpdatesStream为FrozenBufferedUpdates分配delGen,确定apply生效的segment
  if (delGen == -1) {
    throw new IllegalArgumentException(
        "gen is not yet set; call BufferedUpdatesStream.push first");
  }

  totalDelCount += applyTermDeletes(segStates);
  totalDelCount += applyQueryDeletes(segStates);
  totalDelCount += applyDocValuesUpdates(segStates);

  return totalDelCount;
}
apply根据term的删除

这个方法主要是对已经存在的segment的处理。如果是还未flush的段的term删除,在flush的时候就已经被处理了,这部分逻辑在org.apache.lucene.index.FreqProxTermsWriter#applyDeletes。

apply term删除条件,就是遍历所有的segment,读取segment的对应term的倒排信息,获取docID列表,和term删除条件的docUpTo对比,满足条件的就删除。删除的方式就是把docID记录到segment对应的ReadersAndUpdates的pendingDeletes中,最后持久化的时候会生成liv文件。

private long applyTermDeletes(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
  if (deleteTerms.size() == 0) {
    return 0;
  }

  // 如果deleteTerms非空,则说明这些删除信息不是段私有的
  assert privateSegment == null;

  long startNS = System.nanoTime();
  long delCount = 0;
  // 遍历所有的段
  for (BufferedUpdatesStream.SegmentState segState : segStates) {
    //  不属于删除的范围
    if (segState.delGen > delGen) {
      continue;
    }
    if (segState.rld.refCount() == 1) { // merge的情况,以后介绍
      continue;
    }

    FieldTermIterator iter = deleteTerms.iterator();
    BytesRef delTerm;
    TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, true);
    while ((delTerm = iter.next()) != null) {
      // 获取term的倒排信息,得到docID列表  
      final DocIdSetIterator iterator = termDocsIterator.nextTerm(iter.field(), delTerm);
      if (iterator != null) {
        int docID;
        // 删除所有的匹配的文档  
        while ((docID = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
          // rld是  ReadersAndUpdates,其中有个pendingDeletes用来记录存活的docID
          if (segState.rld.delete(docID)) {
            delCount++;
          }
        }
      }
    }
  }

  return delCount;
}
apply根据query的删除

query删除也是遍历所有的segment,然后执行query查询找到匹配的docID,和query删除条件的docUpTo对比,满足条件的就删除,删除的方式和term删除一样。

这里涉及到一些query查询的组件,我们以后再详细介绍。

private long applyQueryDeletes(BufferedUpdatesStream.SegmentState[] segStates)
    throws IOException {
  if (deleteQueries.length == 0) {
    return 0;
  }

  long startNS = System.nanoTime();
  long delCount = 0;
  for (BufferedUpdatesStream.SegmentState segState : segStates) {
    if (delGen < segState.delGen) {
      continue;
    }

    if (segState.rld.refCount() == 1) { // merge的情况,以后介绍
      continue;
    }

    final LeafReaderContext readerContext = segState.reader.getContext();
    for (int i = 0; i < deleteQueries.length; i++) {
      Query query = deleteQueries[i];
      int limit;
      if (delGen == segState.delGen) {
        limit = deleteQueryLimits[i];
      } else {
        limit = Integer.MAX_VALUE;
      }
      final IndexSearcher searcher = new IndexSearcher(readerContext.reader());
      searcher.setQueryCache(null);
      query = searcher.rewrite(query);
      final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1);
      final Scorer scorer = weight.scorer(readerContext);
      if (scorer != null) {
        final DocIdSetIterator it = scorer.iterator();
        if (segState.rld.sortMap != null && limit != Integer.MAX_VALUE) {
          int docID;
          while ((docID = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
            if (segState.rld.sortMap.newToOld(docID) < limit) {
              if (segState.rld.delete(docID)) {
                delCount++;
              }
            }
          }
        } else {
          int docID;
          while ((docID = it.nextDoc()) < limit) {
            if (segState.rld.delete(docID)) {
              delCount++;
            }
          }
        }
      }
    }
  }

  return delCount;
}
appy所有DocValues的更新
private long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState[] segStates)
    throws IOException {

  if (fieldUpdates.isEmpty()) {
    return 0;
  }

  long startNS = System.nanoTime();

  long updateCount = 0;

  for (BufferedUpdatesStream.SegmentState segState : segStates) {

    if (delGen < segState.delGen) {
      continue;
    }

    if (segState.rld.refCount() == 1) {
      continue;
    }
    final boolean isSegmentPrivateDeletes = privateSegment != null;
    if (fieldUpdates.isEmpty() == false) {
      updateCount +=
          applyDocValuesUpdates(segState, fieldUpdates, delGen, isSegmentPrivateDeletes);
    }
  }

  return updateCount;
}

private static long applyDocValuesUpdates(
    BufferedUpdatesStream.SegmentState segState,
    Map<String, FieldUpdatesBuffer> updates,
    long delGen,
    boolean segmentPrivateDeletes)
    throws IOException {
  long updateCount = 0;

  final List<DocValuesFieldUpdates> resolvedUpdates = new ArrayList<>();
  for (Map.Entry<String, FieldUpdatesBuffer> fieldUpdate : updates.entrySet()) {
    String updateField = fieldUpdate.getKey();
    DocValuesFieldUpdates dvUpdates = null;
    FieldUpdatesBuffer value = fieldUpdate.getValue();
    boolean isNumeric = value.isNumeric();
    FieldUpdatesBuffer.BufferedUpdateIterator iterator = value.iterator();
    FieldUpdatesBuffer.BufferedUpdate bufferedUpdate;
    TermDocsIterator termDocsIterator =
        new TermDocsIterator(segState.reader, iterator.isSortedTerms());
    while ((bufferedUpdate = iterator.next()) != null) {
      final DocIdSetIterator docIdSetIterator =
          termDocsIterator.nextTerm(bufferedUpdate.termField, bufferedUpdate.termValue);
      if (docIdSetIterator != null) {
        final int limit;
        if (delGen == segState.delGen) {
          limit = bufferedUpdate.docUpTo;
        } else {
          limit = Integer.MAX_VALUE;
        }
        final BytesRef binaryValue;
        final long longValue;
        if (bufferedUpdate.hasValue == false) {
          longValue = -1;
          binaryValue = null;
        } else {
          longValue = bufferedUpdate.numericValue;
          binaryValue = bufferedUpdate.binaryValue;
        }
        if (dvUpdates == null) {
          if (isNumeric) {
            if (value.hasSingleValue()) {
              dvUpdates =
                  new NumericDocValuesFieldUpdates.SingleValueNumericDocValuesFieldUpdates(
                      delGen, updateField, segState.reader.maxDoc(), value.getNumericValue(0));
            } else {
              dvUpdates =
                  new NumericDocValuesFieldUpdates(
                      delGen,
                      updateField,
                      value.getMinNumeric(),
                      value.getMaxNumeric(),
                      segState.reader.maxDoc());
            }
          } else {
            dvUpdates =
                new BinaryDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc());
          }
          resolvedUpdates.add(dvUpdates);
        }
        final IntConsumer docIdConsumer;
        final DocValuesFieldUpdates update = dvUpdates;
        if (bufferedUpdate.hasValue == false) {
          docIdConsumer = doc -> update.reset(doc);
        } else if (isNumeric) {
          docIdConsumer = doc -> update.add(doc, longValue);
        } else {
          docIdConsumer = doc -> update.add(doc, binaryValue);
        }
        final Bits acceptDocs = segState.rld.getLiveDocs();
        if (segState.rld.sortMap != null && segmentPrivateDeletes) {
          // This segment was sorted on flush; we must apply seg-private deletes carefully in this
          // case:
          int doc;
          while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
            if (acceptDocs == null || acceptDocs.get(doc)) {
              // The limit is in the pre-sorted doc space:
              if (segState.rld.sortMap.newToOld(doc) < limit) {
                docIdConsumer.accept(doc);
                updateCount++;
              }
            }
          }
        } else {
          int doc;
          while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
            if (doc >= limit) {
              break; // no more docs that can be updated for this term
            }
            if (acceptDocs == null || acceptDocs.get(doc)) {
              docIdConsumer.accept(doc);
              updateCount++;
            }
          }
        }
      }
    }
  }

  // now freeze & publish:
  for (DocValuesFieldUpdates update : resolvedUpdates) {
    if (update.any()) {
      update.finish();
      segState.rld.addDVUpdate(update);
    }
  }

  return updateCount;
}

BufferedUpdatesStream

BufferedUpdatesStream用来存储所有的FrozenBufferedUpdates,所有的FrozenBufferedUpdates都需要加入BufferedUpdatesStream获取delGen,从而能在apply的时候确定作用的segment范围。

核心方法

添加一个FrozenBufferedUpdates
synchronized long push(FrozenBufferedUpdates packet) {
  // 为FrozenBufferedUpdates分配delGen
  packet.setDelGen(nextGen++);

  updates.add(packet);
  numTerms.addAndGet(packet.numTermDeletes);
  bytesUsed.addAndGet(packet.bytesUsed);

  return packet.delGen();
}
apply所有的FrozenBufferedUpdates
  void waitApplyAll(IndexWriter writer) throws IOException {
    Set<FrozenBufferedUpdates> waitFor;
    synchronized (this) {
      waitFor = new HashSet<>(updates);
    }

    waitApply(waitFor, writer);
  }

  private void waitApply(Set<FrozenBufferedUpdates> waitFor, IndexWriter writer)
      throws IOException {

    long startNS = System.nanoTime();

    int packetCount = waitFor.size();

    if (waitFor.isEmpty()) {
      return;
    }

    ArrayList<FrozenBufferedUpdates> pendingPackets = new ArrayList<>();
    long totalDelCount = 0;
    for (FrozenBufferedUpdates packet : waitFor) {
      if (writer.tryApply(packet) == false) {
        pendingPackets.add(packet);
      }
      totalDelCount += packet.totalDelCount;
    }
    for (FrozenBufferedUpdates packet : pendingPackets) {
      writer.forceApply(packet);
    }
  }

PendingDeletes

PendingDeletes是用来存储segment的删除信息的。

需要强调的是,PendingDeletes存储的是segment中所有的删除信息,包括已经持久化或者还未持久化的,所以liv文件的更新是全量的,最新的liv文件包含了旧的liv文件的删除信息,因此,如果是在KeepOnlyLastCommitDeletionPolicy索引删除策略的情况下,只会保留最新版本的liv文件。

成员变量

// 对应段的提交信息
protected final SegmentCommitInfo info;
// 只读的,如果段中所有文档都是存活的或者是未初始化则是null
private Bits liveDocs;
// 可写的用来记录存活的文档,有新的删除需要更新才进行初始化
private FixedBitSet writeableLiveDocs;
// 待删除的文档个数
protected int pendingDeleteCount;
// 是否初始化过
boolean liveDocsInitialized;

核心方法

新增一个删除的docID

getMutableBits中看到,writeableLiveDocs优先是基于liveDocs来进行初始化的,因此当前的删除信息总是包含了旧的删除信息。

// 删除文档
// 如果文档确实被删除了,则返回true
// 如果文档已经删除过了,则返回false
boolean delete(int docID) throws IOException {
  // 获取可写的存活文档位图记录器
  FixedBitSet mutableBits = getMutableBits();
  // 判断当前文档是不是存活的
  final boolean didDelete = mutableBits.get(docID);
  if (didDelete) { // 如果要删除的文档是存活的,则执行删除操作
    mutableBits.clear(docID);
    pendingDeleteCount++;
  }
  return didDelete;
}

// 获取writeableLiveDocs
protected FixedBitSet getMutableBits() {
  // 如果writeableLiveDocs还没有被初始化
  if (writeableLiveDocs == null) {
    if (liveDocs != null) { // 如果段当前已经存在删除的文档,则直接拷贝一份
      writeableLiveDocs = FixedBitSet.copyOf(liveDocs);
    } else { // 如果当前段还没有文档被删除,则新建一个表示所有文档都存活的位图
      writeableLiveDocs = new FixedBitSet(info.info.maxDoc());
      writeableLiveDocs.set(0, info.info.maxDoc());
    }
    // 注意,这个方法使得  writeableLiveDocs 和 liveDocs底层存储信息的数组是同一个,
    // 因此writeableLiveDocs的更新也会反映在liveDocs中
    liveDocs = writeableLiveDocs.asReadOnlyBits();
  }
  return writeableLiveDocs;
}
生成liv文件

持久化的时候就是持久化当前的liveDocs,也就是最新的删除信息。

boolean writeLiveDocs(Directory dir) throws IOException {
  if (pendingDeleteCount == 0) { // 没有待删除的文档
    return false;
  }

  Bits liveDocs = this.liveDocs;
  // 把Directory封装成TrackingDirectoryWrapper,是为了如果在生成liv文件的过程中,
  // 如果发生任何异常,则把过程中生成的文件都删除
  TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);

  boolean success = false;
  try {
    Codec codec = info.info.getCodec();
    codec
        .liveDocsFormat()
        .writeLiveDocs(liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT);
    success = true;
  } finally {
    if (!success) {
      // 如果落盘失败,则删除所有新生成的文件
      for (String fileName : trackingDir.getCreatedFiles()) {
        IOUtils.deleteFilesIgnoringExceptions(dir, fileName);
      }
    }
  }

  // 落盘成功,pendingDeleteCount 设为0,表示当前没有未落盘的删除信息,跟方法开头的判断先呼应
  dropChanges();
  return true;
}

FieldUpdatesBuffer

docvalues的更新都是通过term查询来获取更新的doc集合。

工具类

DocValuesUpdate

DocValuesUpdate封装了一次DocValues更新的全部信息:

  // DocValues的类型,目前支持更新的只有两种:NUMERIC和BINARY
  final DocValuesType type;
  // 查找更新文档的term条件
  final Term term;
  // 要更新的字段
  final String field;
  // 当前更新范围的docID上界
  final int docIDUpTo;
  // 是否是有值更新,如果是无值的,相当于把旧值清除
  final boolean hasValue;

成员变量

// 记录当前FieldUpdatesBuffer的内存占用
private final Counter bytesUsed;
// 记录第几次的更新操作
private int numUpdates = 1;
// 存储更新的term条件中的value,把它当成一个数组
private final BytesRefArray termValues;
// 可以获取按照指定的排序规则排好序的下标列表
private BytesRefArray.SortState termSortState;
// 存储binary docvalues更新的值
private final BytesRefArray byteValues;
// 每次更新操作对应的docID上限
private int[] docsUpTo;
// 存储numeric docvalues更新的值
private long[] numericValues; 
// 标记对应的更新是否有值,如果是无值更新,相当于把满足term条件的指定的字段的docvalue清空
private FixedBitSet hasValues;
// 记录最大值和最小值是为了优化存储空间,详细可以看之前关于numericDocValues的介绍
private long maxNumeric = Long.MIN_VALUE;
private long minNumeric = Long.MAX_VALUE;
// 每次更新对应的字段
private String[] fields;
// 是否是对NumericDocValues的更新
private final boolean isNumeric;
// 是否已经结束了,等待flush
private boolean finished = false;

核心方法

构造方法

这里需要注意的是,FieldUpdatesBuffer是一个字段一个的,FieldUpdatesBuffer会以字段的第一次更新的信息来进行初始化,也就是构造函数中的initialValue。fields,docsUpTo都会以initialValue中的对应信息作为数组的第一个元素,之后采用延迟的扩容方式,直到与第一个元素不相同的更新出现,才会把所有的元素都补齐。如果initialValue是有值更新,则记录是否存在值的hasValues不初始化,它是延迟初始化,直到碰到一个无值更新为止,才进行初始化。

这里可能让大家云里雾里,后面介绍新增更新的核心方法的时候就理解了,这里只留个印象就好。

private FieldUpdatesBuffer(
    Counter bytesUsed, DocValuesUpdate initialValue, int docUpTo, boolean isNumeric) {
  this.bytesUsed = bytesUsed;
  this.bytesUsed.addAndGet(SELF_SHALLOW_SIZE);
  termValues = new BytesRefArray(bytesUsed);
  termValues.append(initialValue.term.bytes);
  fields = new String[] {initialValue.term.field};
  bytesUsed.addAndGet(sizeOfString(initialValue.term.field));
  docsUpTo = new int[] {docUpTo};
  // 如果是无值更新,才进行  hasValues 的初始化
  if (initialValue.hasValue == false) {
    hasValues = new FixedBitSet(1);
    bytesUsed.addAndGet(hasValues.ramBytesUsed());
  }
  this.isNumeric = isNumeric;
  byteValues = isNumeric ? null : new BytesRefArray(bytesUsed);
}

// NumericDocValues更新的构造函数
FieldUpdatesBuffer(
    Counter bytesUsed, DocValuesUpdate.NumericDocValuesUpdate initialValue, int docUpTo) {
  this(bytesUsed, initialValue, docUpTo, true);
  if (initialValue.hasValue()) {
    numericValues = new long[] {initialValue.getValue()};
    maxNumeric = minNumeric = initialValue.getValue();
  } else { // 无值更新默认是0
    numericValues = new long[] {0};
  }
  bytesUsed.addAndGet(Long.BYTES);
}

// BinaryDocValues更新的构造函数
FieldUpdatesBuffer(
    Counter bytesUsed, DocValuesUpdate.BinaryDocValuesUpdate initialValue, int docUpTo) {
  this(bytesUsed, initialValue, docUpTo, false);
  if (initialValue.hasValue()) {
    byteValues.append(initialValue.getValue());
  }
}
记录更新的信息
void add(String field, int docUpTo, int ord, boolean hasValue) {
  // fields数组判断是否需要扩容,2个条件:
  // 1.当前field和第一个field不一样  
  // 2.已经扩容过一次了,也就是fields.length != 1
  if (fields[0].equals(field) == false || fields.length != 1) {
    if (fields.length <= ord) {
      String[] array = ArrayUtil.grow(fields, ord + 1);
      if (fields.length == 1) { // 如果是第一次扩容则从下标1到ord的值和fields[0]相同
        Arrays.fill(array, 1, ord, fields[0]);
      }
      bytesUsed.addAndGet(
          (array.length - fields.length) * RamUsageEstimator.NUM_BYTES_OBJECT_REF);
      fields = array;
    }
    if (field != fields[0]) {
      bytesUsed.addAndGet(sizeOfString(field));
    }
    fields[ord] = field;
  }

  // docsUpTo数组判断是否需要扩容,2个条件:
  // 1.当前docUpTo和第一个docUpTo不一样  
  // 2.已经扩容过一次了,也就是docsUpTo.length != 1 
  if (docsUpTo[0] != docUpTo || docsUpTo.length != 1) {
    if (docsUpTo.length <= ord) {
      int[] array = ArrayUtil.grow(docsUpTo, ord + 1);
      if (docsUpTo.length == 1) { // 如果是第一次扩容则从下标1到ord的值和docsUpTo[0]相同
        Arrays.fill(array, 1, ord, docsUpTo[0]);
      }
      bytesUsed.addAndGet((array.length - docsUpTo.length) * Integer.BYTES);
      docsUpTo = array;
    }
    docsUpTo[ord] = docUpTo;
  }

  // hasValues判断是否需要扩容,2个条件:
  // 1.当前的更新是无值更新  
  // 2.hasValues已经初始化过了  
  if (hasValue == false || hasValues != null) {
    if (hasValues == null) { // 第一次初始化,说明碰到的是第一个无值更新的,从0到ord都是有值更新
      hasValues = new FixedBitSet(ord + 1);
      hasValues.set(0, ord);
      bytesUsed.addAndGet(hasValues.ramBytesUsed());
    } else if (hasValues.length() <= ord) {
      FixedBitSet fixedBitSet =
          FixedBitSet.ensureCapacity(hasValues, ArrayUtil.oversize(ord + 1, 1));
      bytesUsed.addAndGet(fixedBitSet.ramBytesUsed() - hasValues.ramBytesUsed());
      hasValues = fixedBitSet;
    }
    if (hasValue) { // 按实际情况进行设置
      hasValues.set(ord);
    }
  }
}
记录新的条件
private int append(Term term) {
  termValues.append(term.bytes);
  return numUpdates++;
}
新增无值更新
void addNoValue(Term term, int docUpTo) {
  final int ord = append(term);
  add(term.field, docUpTo, ord, false);
}
新增有值的NumericDocValues更新
void addUpdate(Term term, long value, int docUpTo) {
  // 记录更新的条件,并获取当前是第几次更新  
  final int ord = append(term);
  String field = term.field;
  add(field, docUpTo, ord, true);
  minNumeric = Math.min(minNumeric, value);
  maxNumeric = Math.max(maxNumeric, value);
  // numericValues数组判断是否需要扩容,2个条件:
  // 1.当前value和第一个value不一样  
  // 2.已经扩容过一次了,也就是numericValues.length != 1   
  if (numericValues[0] != value || numericValues.length != 1) {
    if (numericValues.length <= ord) {
      long[] array = ArrayUtil.grow(numericValues, ord + 1);
      if (numericValues.length == 1) { // 如果是第一次扩容则从下标1到ord的值和numericValues[0]相同
        Arrays.fill(array, 1, ord, numericValues[0]);
      }
      bytesUsed.addAndGet((array.length - numericValues.length) * Long.BYTES);
      numericValues = array;
    }
    numericValues[ord] = value;
  }
}
新增有值的BinaryDocValues更新
void addUpdate(Term term, BytesRef value, int docUpTo) {
  final int ord = append(term);
  byteValues.append(value);
  add(term.field, docUpTo, ord, true);
}
结束添加

结束添加的时候处理了一种针对NumericDocValues更新的特殊情况,同时满足以下三个条件:

  1. 更新的值都相等
  2. 所有的更新都是有值更新
  3. 所有的term条件是同一个字段

针对这种特殊情况,Lcuene对所有缓存的待更新的信息进行了排序,先按term条件的值升序排序,再按docsUpTo降序,这样在处理的时候可以忽略一部分的更新,我们举个例子。

假设有以下更新列表:

termdocsUpTo
d3
c4
a5
d9
a1

经过排序之后,得到:

termdocsUpTo
a5
a1
c4
d9
d3

按序处理上面的列表,我们知道(a,5)已经包含了(a,1),所以(a,1)不用处理了。同理,(d,9)已经包含了(d,3),所以(d,3)不用处理了。

void finish() {
  if (finished) {
    throw new IllegalStateException("buffer was finished already");
  }
  finished = true;
  // 如果是对NumericDocValues更新,并且更新的值都相同,并且都是有值更新,并且term条件是同一个字段,则需要排序
  final boolean sortedTerms = hasSingleValue() && hasValues == null && fields.length == 1;
  if (sortedTerms) {
    // 先按term升序排序,再按docsUpTo降序
    termSortState =
        termValues.sort(
            Comparator.naturalOrder(),
            (i1, i2) ->
                Integer.compare(
                    docsUpTo[getArrayIndex(docsUpTo.length, i2)],
                    docsUpTo[getArrayIndex(docsUpTo.length, i1)]));
    bytesUsed.addAndGet(termSortState.ramBytesUsed());
  }
}

DocumentsWriterDeleteQueue

内部类

DeleteSlice

完整的删除链表是由多个线程共享的,每个线程自己的删除链表就是一个slice。这个每个线程就对应了一个

static class DeleteSlice {
  Node<?> sliceHead; // 哨兵头节点,该节点的信息不会被使用
  Node<?> sliceTail;

  DeleteSlice(Node<?> currentTail) {
    sliceHead = sliceTail = currentTail;
  }

  void apply(BufferedUpdates del, int docIDUpto) {
    if (sliceHead == sliceTail) {
      return;
    }

    Node<?> current = sliceHead;
    do {
      current = current.next;
      // 把删除信息加入del缓存中  
      current.apply(del, docIDUpto);
    } while (current != sliceTail);
    reset();
  }

  void reset() {
    sliceHead = sliceTail;
  }

  /**
   * Returns <code>true</code> iff the given node is identical to the slices tail, otherwise
   * <code>false</code>.
   */
  boolean isTail(Node<?> node) {
    return sliceTail == node;
  }

  /**
   * Returns <code>true</code> iff the given item is identical to the item hold by the slices
   * tail, otherwise <code>false</code>.
   */
  boolean isTailItem(Object object) {
    return sliceTail.item == object;
  }

  boolean isEmpty() {
    return sliceHead == sliceTail;
  }
}

节点Node

static class Node<T> {
  volatile Node<?> next;
  // 节点的信息  
  final T item;

  Node(T item) {
    this.item = item;
  }

  void apply(BufferedUpdates bufferedDeletes, int docIDUpto) {
    throw new IllegalStateException("sentinel item must never be applied");
  }

  boolean isDelete() {
    return true;
  }
}

private static final class TermNode extends Node<Term> {

  TermNode(Term term) {
    super(term);
  }

  @Override
  void apply(BufferedUpdates bufferedDeletes, int docIDUpto) {
    bufferedDeletes.addTerm(item, docIDUpto);
  }

  @Override
  public String toString() {
    return "del=" + item;
  }
}

private static final class QueryArrayNode extends Node<Query[]> {
  QueryArrayNode(Query[] query) {
    super(query);
  }

  @Override
  void apply(BufferedUpdates bufferedUpdates, int docIDUpto) {
    for (Query query : item) {
      bufferedUpdates.addQuery(query, docIDUpto);
    }
  }
}

private static final class TermArrayNode extends Node<Term[]> {
  TermArrayNode(Term[] term) {
    super(term);
  }

  @Override
  void apply(BufferedUpdates bufferedUpdates, int docIDUpto) {
    for (Term term : item) {
      bufferedUpdates.addTerm(term, docIDUpto);
    }
  }

  @Override
  public String toString() {
    return "dels=" + Arrays.toString(item);
  }
}

  private static final class DocValuesUpdatesNode extends Node<DocValuesUpdate[]> {

    DocValuesUpdatesNode(DocValuesUpdate... updates) {
      super(updates);
    }

    // 只支持numeric和binary docValues的更新
    void apply(BufferedUpdates bufferedUpdates, int docIDUpto) {
      for (DocValuesUpdate update : item) {
        switch (update.type) {
          case NUMERIC:
            bufferedUpdates.addNumericUpdate((NumericDocValuesUpdate) update, docIDUpto);
            break;
          case BINARY:
            bufferedUpdates.addBinaryUpdate((BinaryDocValuesUpdate) update, docIDUpto);
            break;
          case NONE:
          case SORTED:
          case SORTED_SET:
          case SORTED_NUMERIC:
          default:
            throw new IllegalArgumentException(
                update.type + " DocValues updates not supported yet!");
        }
      }
    }

    @Override
    boolean isDelete() {
      return false;
    }

    @Override
    public String toString() {
      StringBuilder sb = new StringBuilder();
      sb.append("docValuesUpdates: ");
      if (item.length > 0) {
        sb.append("term=").append(item[0].term).append("; updates: [");
        for (DocValuesUpdate update : item) {
          sb.append(update.field).append(':').append(update.valueToString()).append(',');
        }
        sb.setCharAt(sb.length() - 1, ']');
      }
      return sb.toString();
    }
  }

关键成员变量

// 在flush之前,所有的DocumentsWriterPerThread是共享的
private volatile Node<?> tail;

// 全局的删除链表,作用范围是当前所有的segment。
private final DeleteSlice globalSlice;

// flush的时候,globalSlice中所有的数据已经在globalBufferedUpdates中
private final BufferedUpdates globalBufferedUpdates;

核心方法

新增一个节点

注意是同步的,因为DocumentsWriterDeleteQueue是全局共享的。

synchronized long add(Node<?> newNode) {
  ensureOpen();
  tail.next = newNode;
  this.tail = newNode;
  return getNextSequenceNumber();
}
新增更新和删除节点
long addDelete(Query... queries) {
  long seqNo = add(new QueryArrayNode(queries));
  tryApplyGlobalSlice();
  return seqNo;
}

long addDelete(Term... terms) {
  long seqNo = add(new TermArrayNode(terms));
  tryApplyGlobalSlice();
  return seqNo;
}

long addDocValuesUpdates(DocValuesUpdate... updates) {
  long seqNo = add(new DocValuesUpdatesNode(updates));
  tryApplyGlobalSlice();
  return seqNo;
}

long add(Node<?> deleteNode, DeleteSlice slice) {
  long seqNo = add(deleteNode);
  slice.sliceTail = deleteNode;
  tryApplyGlobalSlice();
  return seqNo;
}
把gloablSlice的数据apply到globalBufferedUpdates
void tryApplyGlobalSlice() {
  if (globalBufferLock.tryLock()) {
    ensureOpen();
    try {
      if (updateSliceNoSeqNo(globalSlice)) {// 如果globalSlice有新数据,则把数据都更新到globalBufferedUpdates
        globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
      }
    } finally {
      globalBufferLock.unlock();
    }
  }
}
把globalBufferedUpdates转成FrozenBufferedUpdates
FrozenBufferedUpdates maybeFreezeGlobalBuffer() {
  globalBufferLock.lock();
  try {
    if (closed == false) {
      return freezeGlobalBufferInternal(tail); 
    } else {
      return null;
    }
  } finally {
    globalBufferLock.unlock();
  }
}

private FrozenBufferedUpdates freezeGlobalBufferInternal(final Node<?> currentTail) {
  if (globalSlice.sliceTail != currentTail) { // global中还有数据没有apply到globalBufferedUpdates
    globalSlice.sliceTail = currentTail;
    globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
  }

  if (globalBufferedUpdates.any()) { // globalBufferedUpdates中有数据
    final FrozenBufferedUpdates packet =
        new FrozenBufferedUpdates(infoStream, globalBufferedUpdates, null);
    globalBufferedUpdates.clear();
    return packet;
  } else {
    return null;
  }
}
更新指定的DeleteSlice
// 参数slice就是DocumentsWriterPerThread中私有的DeleteSlice
synchronized long updateSlice(DeleteSlice slice) {
  ensureOpen();
  long seqNo = getNextSequenceNumber();
  if (slice.sliceTail != tail) {
    slice.sliceTail = tail;
    // 如果有更新,则返回的  seqNo 是负数
    seqNo = -seqNo;
  }
  return seqNo;
}

boolean updateSliceNoSeqNo(DeleteSlice slice) {
  if (slice.sliceTail != tail) {
    slice.sliceTail = tail;
    return true;
  }
  return false;
}

详细流程

后文介绍IndexWriter的增删改时全流程解析。