阅读 435

从一次线上问题谈谈 Elasticsearch 读写架构

问题描述:用 Elasticsearch 消费 kafka 中的数据。kafka broker 版本不变,Elasticsearch 版本不变,Elasticsearch 的客户端(transport API)版本不变。kafka client 版本发生变化,由于之前使用 apache kafka 原生API 消费,消费模型为 一个 kafka consumer 对应多个 partition;现使用 spring kafka 重构,保证 一个partition 至少对应一个 kafka consumer。现有9个 topic,合计27个 partition。

问题一:Elasticsearch 抛出以下异常

java.util.concurrent.ExecutionException: RemoteTransportException[[es03][192.168.132.115:9300][indices:data/write/update]]; nested: RemoteTransportException[[es03][192.168.132.115:9300][indices:data/write/update[s]]]; nested: VersionConflictEngineException[[query][c3b8666b43b7cdab0450b0620b7f5e5df0633b35]: version conflict, document already exists (current version [1])];
复制代码
  1. Elasticsearch 在写 document 时是基于乐观锁的,所以上述问题发生肯定是因为并发写的问题
  2. 而现在正常情况下每个 partition 对应一个 kafka consumer,是不应该出现并发写(也就是重复消费情况)的
  3. 上述第二点忽略了一个重要前提,在 groupId 保持一致的情况下,才不会出现一个 partition 被多个 kafka consumer 消费的情况

结论:kafka client group id 未与之前保持一致

Tips:

Elasticsearch 是分布式的。当创建、更新或删除文档时,必须将文档的新版本复制到集群中的其他节点。Elasticsearch也是异步和并发的,这意味着这些复制请求是并行发送的,并且到达目的地的顺序可能不一致。Elasticsearch需要确保文档的旧版本不会覆盖新版本。

为了确保文档的旧版本不会覆盖新版本,对文档执行的每个操作都由协调该更改的主分片分配一个序列号。序列号随着每个操作的增加而增加,因此新操作的序列号一定比旧操作的序列号高。然后,Elasticsearch可以使用操作的序列号来确保新文档版本不会被分配了较小序列号的更改覆盖。

问题二:kafka 抛出以下异常

org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.
复制代码

经排查测试

之前消费这默认是 RangeAssignor

partition.assignment.strategy = org.apache.kafka.clients.consumer.RangeAssignor
复制代码

现在优化为

partition.assignment.strategy = org.apache.kafka.clients.consumer.RoundRobinAssignor
复制代码

所以会让 kafka 无法合理分配 partition

Elasticsearch 节点类型

Master Node
  • 职责
    1. 处理创建,删除索引等请求 / 决定分片⽚被分配到哪个节点 / 负责索引的创建与删除。
    2. 维护并且更新 Cluster State,且只能由 master node 维护,否则会造成集群状态不正常。
  • Master Node 的最佳实践
    1. Master 节点非常重要,在部署上需要考虑解决单点的问题。
    2. 为⼀个集群设置多个 Master 节点 / 每个节点只承担 Master 的单⼀角色。
Data Node
  • 职责:保存分片数据。在数据扩展上起到了至关重要的作用(由 Master Node 决定如何把分片分发到数据节点上)。
  • 节点启动后,默认就是数据节点。可以设置 node.data: false 禁止。
  • 通过增加数据节点,可以解决数据水平扩展和解决数据单点问题。
Master Eligible Nodes & 选主流程
  • ⼀个集群,⽀持配置多个 Master Eligible 节点。这些节点可以在必要时(如 Master 节点出现故障,网络故障时)参与选主流程,成为 Master 节点。
  • 节点启动后,默认就是⼀个 Master eligible 节点,设置 node.master: false 禁止。
  • 当集群内第⼀个 Master eligible 节点启动时候,它会将自己选举成 Master 节点。
Coordinating Node
  • 处理请求的节点,负责路由请求到正确的节点,如创建索引的请求需要路由到 Master 节点。
  • 所有节点默认都是 Coordinating Node。
  • 通过将其他类型(data node/master node/master eligible node)设置成 False,使其成为专门负责的协调的节点。
节点类型总结
节点类型 配置参数 默认值
master eligible node.master true
data node.data true
ingest node.ingest true
coordinating only 设置上面三个参数全部为false
machine learning node.ml true(需要enable x-pack)

Elasticsearch 写架构

Elasticsearch 在创建,更新甚至删除的时候会更改 document version。

Elasticsearch 如何做到高可用:
  1. 数据首先写入到 Index buffer(内存) 和 Transaction log(磁盘) 中,即便内存数据丢失,也可读取磁盘中的 Transaction log
  2. 默认 1s 一次的 refresh 操作将 Index buffer 中的数据写入 segments(内存),此时数据可查询
  3. 默认30分钟执行一次的 flush 操作,将 segments 写入磁盘,同时清空 Transaction log;若 Transaction log 满(默认512M),也会执行此操作。
  4. merge 操作,定期合并 segment

  Elasticsearch 中的每个索引操作首先使用路由解析到一个副本组,通常基于文档ID。一旦确定了副本组,操作将在内部转发到组的当前主分片。主分片负责验证数据格式并将其转发到其他副本。由于副本可以由主分片异步复制,所以不需要主副本复制到所有副本。相反,Elasticsearch 维护一个应该接收操作的副本分片列表。这个列表称为同步副本,由主节点维护。顾名思义,这些是一组保证处理了所有已向用户确认的索引操作和删除操作的分片副本。主分片负责维护,因此必须将所有操作复制到这个集合中的每个副本分片。

主分片遵循以下基本流程:

  • 验证传入操作并在结构无效时拒绝它(例如:插入时字段格式与 mapping 不匹配)
  • 在本地执行操作,即索引或删除相关文档。这还将验证字段的内容,并在需要时拒绝(例如:关键字值太长,无法在Lucene中进行索引)。
  • 将操作转发到当前同步复制集中的每个副本分片。如果有多个副本分片,则并行执行。
  • 一旦所有副本分片都成功地执行了操作并响应了主分片,主副本就会向客户端确认请求成功完成。
Lucene Index
  • 在 Lucene 中,单个倒排索引⽂件被称为Segment。Segment 是⾃包含的,不可变更的,多个 Segments 汇总在⼀起,称为 Lucene 的 Index,其对应的就是 ES 中的 Shard(分片)。然后另外使用一个 commit 文件,记录索引内所有的 segment。
  • 当有新文档写⼊时,会生成新 Segment,查询时会同时查询所有 Segments,并且对结果汇总。Lucene 中有⼀个⽂件,用来记录所有 Segments 信息,叫做 Commit Point。
  • 删除的⽂档信息,保存在“.del”文件中。
什么是 Refresh
  • 数据首先写入到 Index buffer(内存)中,此时数据不可被查询到。

  • 将 Index buffer 写入 Segment 的过程叫Refresh。Refresh 不执行 fsync 操作,此操作不会将数据写入磁盘。

  • Refresh 频率:默认 1 秒发生⼀次,可通过 index.refresh_interval 配置。Refresh 后,数据就可以被搜索到了。这也是为什么 Elasticsearch 被称为近实时搜索
  • 如果系统有⼤量的数据写入,那就会产生很多的 SegmentIndex Buffer 被占满时,会触发 Refresh,默认值是 JVM 的 10%。
什么是 Transaction log

  • Segment 写⼊磁盘的过程相对耗时,借助⽂件系统缓存,Refresh 时,先将 Segment 写入缓存以开放查询。
  • 为了保证数据不会丢失。所以在 Index 文档时,同时写 Transaction Log,⾼版本开始,Transaction Log 默认落盘。每个分片有⼀个 Transaction Log。
  • 在 ES Refresh 时,Index Buffer 被清空,Transaction log 不会清空。
什么是 Flush

ES Flush & Lucene Commit

  • 实际影响:调用 Refresh,Index Buffer 清空,调⽤ fsync,将缓存中的 Segments写⼊磁盘,清空(删除)Transaction Log
  • 调用时机:默认 30 分钟调用⼀次,或者 Transaction Log 满 (默认 512 MB)
什么是 Merge

  • Segment 很多,需要被定期被合并
    • 减少 Segments / 删除已经删除的文档
  • ES 和 Lucene 会自动进行 Merge 操作
    • POST my_index/_forcemerge

Elasticsearch 读流程

  Elasticsearch 使用主备模型。主备份模型的一个优点是,主分片和其所有副本分片存有相同的数据。因此,一个同步副本就足以满足读请求。

  Elasticsearch 中的读取可以直接使用 document ID,也可以是非常复杂的搜索,包含复杂的聚合,这个操作会占用大量CPU资源。

  当节点接收到读请求时,该节点负责将其转发给包含相关分片的节点、整合所有分片的返回值并响应客户端(类似于一个MapReduce)。我们将该节点称为请求的协调节点。基本流程如下:

  • 将读请求解析到相关分片。注意,由于大多数搜索将被发送到一个或多个索引,因此它们通常需要从多个分片中读取,每个分片表示数据的不同子集。
  • 从分片复制组中选择每个相关分片的活动副本。这可以是主分片,也可以是副本分片。默认情况下,Elasticsearch只是在副本分片之间进行循环。
  • 将分片级别的读请求发送到所选副本。
  • 整合结果并做出响应。注意,在get by ID查找的情况下,只有一个分片是相关的,可以跳过这一步。

思考

  1. 上线前要至少两人核实所有配置无误
  2. 一定要完全了解所使用中间件的基本架构与细节,Elasticsearch 写 document 时乐观锁形式实在是后知后觉

参考资料

  1. www.elastic.co/guide/en/el…
  2. 《Elasticsearch in action》
  3. wangnan.tech/post/elksta…
  4. www.elastic.co/guide/en/el…

本文由 发给官兵 创作,采用 CC BY 3.0 CN协议 进行许可。 可自由转载、引用,但需署名作者且注明文章出 处。如转载至微信公众号,请在文末添加作者公众号二维码。

关注下面的标签,发现更多相似文章
评论