Elasticsearch 5.x 源码分析(5)segments merge 流程分析 - 简书

1,040 阅读8分钟
原文链接: www.jianshu.com

这两周主要看了下 Elasticsearch(其实是Lucene)的 segments 的 merge 流程。事情起因是,线上的ES有些大索引,其中的segments 个数几十个,每个大小100M+,小 segments 若干,而遇到问题就是这些大的 segments 不再做 merge 了,除非强制进行forceMerge 操作,由于我们第一次ES上线,其实也不清楚这究竟是个问题还是本来 Lucene 就是这样,网上找了一些关于ES 或者 Lucene 的 merge 的策略介绍,除了说道大家都了解的一些常规的参数,如最大size,最大doc 下不会再做merge云云,就是没提到到了多少个 segments 之后就不 merge ,接着问了Elasticsearch 圈子的一些人,也没有找到非常确定的答案。查找几天资料无果,顺带就看看源码,最终在昨天又瞄了几眼终于发现一段算法,虽无验证,但应八九不离十,故记录分享之。


Segments

首先还是先重温一下 Lucene 下的 segments,对这个比较陌生的可以阅读三斗大神的这一节

Lucene产生segments示意图

我只引用最下面那张图介绍一下,绿色的就是已经固化的一个个的 segments 文件,不会再更新,左下角就是当前在内存的 Lucene 维护的查询可见的仍为持久化的segment,当Elasticsearch 配置的refresh_invterval (默认是1s,可调)到时,这些in in-memory buffer就会推送到OS的文件系统缓存中去,注意这里只是到缓存,很可能OS仍未持久化到文件系统,成为一个单独的 segment 文件,而啥时commit 到文件系统永不丢失,则由Lucene 的flush 机制保证,当Lucene 做完flush 则表明该 segment 真正推送到文件系统,此时才会在translog做标记并可以删除commit之前的translog 了。

注意这里只是一个简化描述,据三斗和赖总介绍,Lucene 仍有很多因素会促使产生一个segment 而不是百分百由Elasticsearch的refresh_interval 决定。这里就不继续讨论究竟在哪些情况会立即生成一个segment了。

Segment 的merge

详细信息可看三斗这一节 这里只引用两个图介绍一下

merge前
merge后

从图上看,Lucene每次会选取一些小的segments 进而merge到一个大的segment,我这里不再赘述流程和策略,这里只补充一句就是,如果你之前用的scroll查询,之前的scroll还是会指向老的segments,也就是说老的segments 的引用会知道scroll失效后才会被回收。


Elasticsearch 5.x merge 参数的变化

在老的Elasticsearch 中,merge 被认为是一个非常消耗资源的操作,甚至只有一个线程来做这事,并且会影响indexing的request。在之前的版本里,merge 操作用的是一类 merge throttle limit这样的配置来限制各种峰值数据,如下面这些参数,注意这些参数都已经在5.x 中移除掉了。

  • indices.store.throttle.type
  • indices.store.throttle.max_bytes_per_sec

因为在Elasticsearch 5.x 想采用多线程和动态调整这种方式来更加智能地去执行merge操作。如检测是否使用SSD硬盘,应该启动多少个merge线程等。
在Elasticsearch 5.x 下,tired merge policy 成为了唯一的merge策略。因此下面的参数同样也在5.x 下被移除了。

  • index.merge.policy.type
  • index.merge.policy.min_merge_size
  • index.merge.policy.max_merge_size
  • index.merge.policy.merge_factor
  • index.merge.policy.max_merge_docs
  • index.merge.policy.calibrate_size_by_deletes
  • index.merge.policy.min_merge_docs
  • index.merge.policy.max_merge_docs

如上面说的,ES希望采用一种更智能的方式去调整这些参数,达到一个性能的折中。在5下我们可以配置这些参数:

  • index.merge.policy.expunge_deletes_allowed: 指删除了的文档数在一个segment里占的百分比,默认是10,大于这个值时,在执行expungeDeletes 操作时将会merge这些segments.
  • index.merge.policy.floor_segment: 官网的解释我没大看懂,我的个人理解是ES会避免产生很小size的segment,小于这个阈值的所有的非常小的segment都会做merge直到达到这个floor 的size,默认是2MB.
  • index.merge.policy.max_merge_at_once: 一次最多只操作多少个segments,默认是10.
  • index.merge.policy.max_merge_at_once_explicit: 显示调用optimize 操作或者 expungeDeletes时可以操作多少个segments,默认是30.
  • index.merge.policy.max_merged_segment: 超过多大size的segment不会再做merge,默认是5g.
  • index.merge.policy.segments_per_tier: 每个tier允许的segement 数,注意这个数要大于上面的at_once数,否则这个值会先于最大可操作数到达,就会立刻做merge,这样会造成频繁
  • index.reclaim_deletes_weight: 考虑merge的segment 时删除文档数量多少的权重,默认即可.
  • index.compund_format: 还不知道干啥用的,默认即可.

merge 线程调整

Elasticsearch 5 采用了多线程去执行merge,可以通过修改index.merge.scheduler.max_thread_count 来动态调整这个线程数,默认的话是通过下面公式去计算:

Math.max(1, Math.min(4, Runtime.getRuntime().availableProcessors() / 2)) 

要注意的是如果你是用HDD而非SSD的磁盘的话,最好是用单线程为妙。


force merge API

这里有3个参数可以用

  • max_num_segments 期望merge到多少个segments,1的意思是强行merge到1个segment
  • only_expunge_deletes 只做清理有deleted的segments,即瘦身
  • flush 清理完执行一下flush,默认是true

你可以用下面的URL来执行强行的merge

curl -XPOST "http://localhost:9200/library/_forcemerge?max_num_segments=1

代码介绍

merge的代码相对比较少,因为基本上ES并没有做什么东西,只是采用了多线程,和直接调用Lucene的API而已,主要看几个类:

  • MergePolicyConfig.java 专门管理我们输入的那些参数
  • MergeScheduler.java 这个类其实就是封装了一下Lucene的几个调用
  • ConcurrentMergeScheduler.java 继承了MergeScheduler.java在上面实现了多线程分工并加上了多线程下的一些限制,如上面配置的那些最大XXX,最多XXX 之类的。
  • ElasticsearchConcurrentMergeScheduler.java 继承了ConcurrentMergeScheduler.java 通过配置去控制整个ES实例的merge 流程的运作,还有打印日志等。
  • TieredMergePolicy.java merge的策略控制类,之前提到了ES5后只剩下这个唯一的默认的策略控制,所有的选择,打分,触发merge的策略都在这里定义。

最后发现问题就出在TieredMergePolicy.java 上,再次回顾我遇到的问题

事情起因是,线上的ES有些大索引,其中的segments 个数几十个,每个大小100M+,小 segments 若干,而遇到问题就是这些大的 segments 不再做 merge 了,除非强制进行forceMerge 操作。

然后我们跳到下面这个方法:

public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos, IndexWriter writer) throws IOException {

<snip> //这里主要拿到总bytes数,segments数,踢掉超过阈值的segments

    minSegmentBytes = floorSize(minSegmentBytes);
    // Compute max allowed segs in the index
    long levelSize = minSegmentBytes;
    long bytesLeft = totIndexBytes;
    double allowedSegCount = 0;
    while(true) {
      final double segCountLevel = bytesLeft / (double) levelSize;
      if (segCountLevel < segsPerTier) {
        allowedSegCount += Math.ceil(segCountLevel);
        break;
      }
      allowedSegCount += segsPerTier;
      bytesLeft -= segsPerTier * levelSize;
      levelSize *= maxMergeAtOnce;
    }
    int allowedSegCountInt = (int) allowedSegCount;

<snip>

问题就出在这段算法里了,tieredPolicy,顾名思义就是梯队,阶级,等级,就是把所有的segments 分层一个个的阶级,ES的设想就是 每个tier 都应该至少包含 segsPerTier 个segments,这样从上至下就可以分批的一次每个tier都可以做一轮merge操作,举个例子,如果我们按默认值floor的size是2MB,maxMergeAtOnce 使用默认10 的话,那么最低层就应该有 10 个 2MB的segments,做完merge 应该就会产生一个20MB的 segment,那么这个20MB应该就是下一个tier, 在这个tier里也应该有至少10个20MB的segments 来等待做merge,如此类推。
那我就用我遇到的例子来演绎一遍上面的算法,假设我有5个100MB的大segments,下面就只有少数几个的segments,segsPerTiermaxMergeAtOnce 都采用默认值10。

  1. 第一次while,segCountLevel = 500/2 = 250 显然大于10,所以allowedSegCount = 10bytesLeft = 500 - 2*10 = 480levelSize = 2*10 = 20
  2. 第二次while,segCountLevel = 480/20 = 24 还是大于10,所以allowedSegCount = 10 + 10 = 20bytesLeft = 480 - 20*10 = 280levelSize = 20*10 = 200
  3. 第三次while,segCountLevel = 280/200 = 1.4 小于10 了,所以allowedSegCount = 20 + 2 = 22; 退出

也就是说这次的merge 操作,根据当前segments总的字节数推算,ES应该是被允许最多merge 22 个segments;接着就是去找实际可以merge的总的eligible的segments数量

// Gather eligible segments for merging, ie segments
     // not already being merged and not already picked (by
     // prior iteration of this loop) for merging:
     final List<SegmentCommitInfo> eligible = new ArrayList<>();
     for(int idx = tooBigCount; idx<infosSorted.size(); idx++) {
       final SegmentCommitInfo info = infosSorted.get(idx);
       if (merging.contains(info)) {
         mergingBytes += size(info, writer);
       } else if (!toBeMerged.contains(info)) {
         eligible.add(info);
       }
     }

     final boolean maxMergeIsRunning = mergingBytes >= maxMergedSegmentBytes;

     if (verbose(writer)) {
       message("  allowedSegmentCount=" + allowedSegCountInt + " vs count=" + infosSorted.size() + " (eligible count=" + eligible.size() + ") tooBigCount=" + tooBigCount, writer);
     }

     if (eligible.size() == 0) {
       return spec;
     }

     if (eligible.size() > allowedSegCountInt) {
      //可以作为预备merge的segments大于允许的数,这轮merge可以做了
      //剩下就是为segments打分,选出一定数量的segments来merge
     } else {
      //达不到预期数量,不做了
     }

从上面看到,确实如果segments 不够ES被期望的达到那么多可被merge的segments 数量的时候,其实ES是不做merge的。那么就会在一种场景里面出现:

当索引达到了比较大时,这时经过了一定时间的merge 完成后,segments都会比较大,这时如果indexing的频率相对比较低时,则每轮merge 选择阶段就会得出ES期望这次可以merge的segments数就会比较大,而如果eligible的segments并没有那么大时,则ES就不会进行merge

这就是我得到的结论,还望各ES大神指点是否准确。