Kubernetes 并发控制与数据一致性的实现原理

4,108 阅读9分钟

在大型分布式系统中,定会存在大量并发写入的场景。在这种场景下如何进行更好的并发控制,即在多个任务同时存取数据时保证数据的一致性,成为分布式系统必须解决的问题。

悲观并发控制和乐观并发控制是并发控制中采用的主要技术手段,对于不同的业务场景,应该选择不同的控制方法。

悲观锁

悲观并发控制(又名“悲观锁”,Pessimistic Concurrency Control,缩写“PCC”)是一种并发控制的方法。它可以阻止一个事务以影响其他用户的方式来修改数据。如果一个事务执行的操作读某行数据应用了锁,那只有当这个事务把锁释放,其他事务才能够执行与该锁冲突的操作。

在悲观锁的场景下,假设用户 A 和 B 要修改同一个文件,A 在锁定文件并且修改的过程中,B 是无法修改这个文件的,只有等到 A 修改完成,并且释放锁以后,B 才可以获取锁,然后修改文件。由此可以看出,悲观锁对并发的控制持悲观态度,它在进行任何修改前,首先会为其加锁,确保整个修改过程中不会出现冲突,从而有效的保证数据一致性。但这样的机制同时降低了系统的并发性,尤其是两个同时修改的对象本身不存在冲突的情况。同时也可能在竞争锁的时候出现死锁,所以现在很多的系统例如 Kubernetes 采用了乐观并发的控制方法。

乐观锁

乐观并发控制(又名“乐观锁”,Optimistic Concurrency Control,缩写“OCC”)是一种并发控制的方法。它假设多用户并发的事务在处理时不会彼此影响,各事务能够在不请求锁的情况下处理各自的数据。在提交数据更新之前,每个事务会先检查在该事务读取数据后,有没有其他事务又修改了该数据。如果其他事务有更新的话,正在提交的事务会进行回滚。

相对于悲观锁对锁的提前控制,乐观锁相信请求之间出现冲突的概率是比较小的,在读取及更改的过程中都是不加锁的,只有在最后提交更新时才会检测冲突,因此在高并发量的系统中占有绝对优势。同样假设用户A和B要修改同一个文件,A和B会先将文件获取到本地,然后进行修改。如果A已经修改好并且将数据提交,此时B再提交,服务器端会告知B文件已经被修改,返回冲突错误。此时冲突必须由B来解决,可以将文件重新获取回来,再一次修改后提交。

乐观锁通常通过增加一个资源版本字段,来判断请求是否冲突。初始化时指定一个版本值,每次读取数据时将版本号一同读出,每次更新数据,同时也对版本号进行更新。当服务器端收到数据时,将数据中的版本号与服务器端的做对比,如果不一致,则说明数据已经被修改,返回冲突错误。

Kubernetes中的并发控制

在Kubernetes 集群中,外部用户及内部组件频繁的数据更新操作,导致系统的数据并发读写量非常大。假设采用悲观并行的控制方法,将严重损耗集群性能,因此 Kubernetes 采用乐观并行的控制方法。Kubernetes 通过定义资源版本字段实现了乐观并发控制,资源版本 (ResourceVersion)字段包含在 Kubernetes 对象的元数据 (Metadata)中。这个字符串格式的字段标识了对象的内部版本号,其取值来自 etcd 的 modifiedindex,且当对象被修改时,该字段将随之被修改。值得注意的是该字段由服务端维护,不建议在客户端进行修改。

type ObjectMeta struct {

      ......


      // An opaque value that represents the internal version of this object   that can

      // be used by clients to determine when objects have changed. May be   used for optimistic

      // concurrency, change detection, and the watch operation on a   resource or set of resources.

      // Clients must treat these values as opaque and passed unmodified   back to the server.

      // They may only be valid for a particular resource or set of   resources.

      //

      // Populated by the system.

      // Read-only.

      // Value must be treated as opaque by clients and .

      // More info:   https://git.k8s.io/community/contributors/devel/api-conventions.md#concurrency-control-and-consistency

      // +optional

      ResourceVersion string

     

      ......

}

Kube-Apiserver可以通过该字段判断对象是否已经被修改。当包含 ResourceVersion 的更新请求到达 Apiserver,服务器端将对比请求数据与服务器中数据的资源版本号,如果不一致,则表明在本次更新提交时,服务端对象已被修改,此时 Apiserver 将返回冲突错误(409),客户端需重新获取服务端数据,重新修改后再次提交到服务器端。上述并行控制方法可防止如下的 data race:

Client #1: GET Foo

Client #2: GET Foo


Client #1: Set Foo.Bar =   "one"

Client #1: PUT Foo


Client #2: Set Foo.Baz =   "two"

Client #2: PUT Foo

当未采用并发控制时,假设发生如上请求序列,两个客户端同时从服务端获取同一对象Foo(含有Bar、Baz 两个字段),Client#1先将 Bar 字段置成one,其后 Client#2 对 Baz 字段赋值的更新请求到服务端时,将覆盖 Client#1 对 Bar 的修改。反之在对象中添加资源版本字段,同样的请求序列将如下:


Client #1: GET Foo  //初始Foo.ResourceVersion=1

Client #2: GET Foo  //初始Foo.ResourceVersion=1


Client #1: Set Foo.Bar =   "one"

Client #1: PUT Foo  //更新Foo.ResourceVersion=2


Client #2: Set Foo.Baz =   "two"

Client #2: PUT Foo  //返回409冲突

Client#1 更新对象后资源版本号将改变,Client#2 在更新提交时将返回冲突错误(409),此时 Client#2 必须在本地重新获取数据,更新后再提交到服务端。

假设更新请求的对象中未设置 ResourceVersion 值,Kubernetes 将会根据硬改写策略(可配置)决定是否进行硬更新。如果配置为可硬改写,则数据将直接更新并存入 Etcd,反之则返回错误,提示用户必须指定 ResourceVersion。

Kubernetes 中的 Update 和 Patch

Kubernetes 实现了 Update 和 Patch 两个对象更新的方法,两者提供不同的更新操作方式,但冲突判断机制是相同的。

Update

对于 Update,客户端更新请求中包含的是整个 obj 对象,服务器端将对比该请求中的obj对象和服务器端最新obj对象的 ResourceVersion 值。如果相等,则表明未发生冲突,将成功更新整个对象。反之若不相等则返回409冲突错误,Kube-Apiserver 中冲突判断的代码片段如下。

e.Storage.GuaranteedUpdate(ctx,   key...) (runtime.Object, *uint64, error) {

      //   If AllowUnconditionalUpdate() is true and the object specified by

       //   the user does not have a resource version, then we populate it with

        //   the latest version. Else, we check that the version specified by

        //   the user matches the version of latest storage object.

        resourceVersion,   err := e.Storage.Versioner().ObjectResourceVersion(obj)

        if   err != nil {

            return   nil, nil, err

              }

       version, err :=   e.Storage.Versioner().ObjectResourceVersion(existing)

       doUnconditionalUpdate   := resourceVersion == 0 && e.UpdateStrategy.AllowUnconditionalUpdate()

       if   doUnconditionalUpdate {

         //   Update the object's resource version to match the latest

         //   storage object's resource version.

          err   = e.Storage.Versioner().UpdateObject(obj, res.ResourceVersion)

           if   err != nil {

           return   nil, nil, err

          }

       }   else {

        //   Check if the object's resource version matches the latest

         //   resource version.

         ......

            if   resourceVersion != version {

         return   nil, nil, kubeerr.NewConflict(qualifiedResource, name,   fmt.Errorf(OptimisticLockErrorMsg))

         }

              }

         ......

       return   out, creating, nil

}

基本流程为:

  1. 获取当前更新请求中 obj 对象的 ResourceVersion 值,及服务器端最新 obj 对象 (existing) 的 ResourceVersion 值

  2. 如果当前更新请求中 bj 对象的 ResourceVersion 值等于 0,即客户端未设置该值,则判断是否要硬改写 (AllowUnconditionalUpdate),如配置为硬改写策略,将直接更新 obj 对象

  3. 如果当前更新请求中 obj 对象的 ResourceVersion 值不等于 0,则判断两个 ResourceVersion 值是否一致,不一致返回冲突错误 (OptimisticLockErrorMsg)

Patch

相比Update请求包含整个obj对象,Patch请求实现了更细粒度的对象更新操作,其请求中只包含需要更新的字段。例如要更新pod中container的镜像,可使用如下命令:

kubectl patch pod my-pod -p   '{"spec":{"containers":[{"name":"my-container","image":"new-image"}]}}'

服务器端只收到以上的 patch 信息,然后通过如下代码将该 patch 更新到 Etcd 中。

func (p *patcher) patchResource(ctx   context.Context) (runtime.Object, error) {

       p.namespace   = request.NamespaceValue(ctx)

       switch   p.patchType {

       case   types.JSONPatchType, types.MergePatchType:

              p.mechanism   = &jsonPatcher{patcher: p}

       case   types.StrategicMergePatchType:

              schemaReferenceObj,   err := p.unsafeConvertor.ConvertToVersion(p.restPatcher.New(),   p.kind.GroupVersion())

              if   err != nil {

                     return   nil, err

              }

              p.mechanism   = &smpPatcher{patcher: p, schemaReferenceObj: schemaReferenceObj}

       default:

              return   nil, fmt.Errorf("%v: unimplemented patch type", p.patchType)

       }

       p.updatedObjectInfo   = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission)

       return   finishRequest(p.timeout, func() (runtime.Object, error) {

              updateObject,   _, updateErr := p.restPatcher.Update(ctx, p.name, p.updatedObjectInfo,   p.createValidation, p.updateValidation, false, p.options)

              return   updateObject, updateErr

       })

}

基本流程为:

1.首先判断 patch 的类型,根据类型选择相应的 mechanism

2.利用 DefaultUpdatedObjectInfo 方法将 applyPatch (应用 Patch 的方法)添加到 admission chain 的头部

3.最终还是调用上述 Update 方法执行更新操作

在步骤 2 中将 applyPatch 方法挂到 admission chain 的头部,与 admission 行为相似,applyPatch 方法会将 patch 应用到最新获取的服务器端 obj 上,生成一个已更新的obj,再对该obj继续执行 admission chain 中的 Admit 与 Validate。最终调用的还是 update 方法,因此冲突检测的机制与上述 Update 方法完全一致。

相比 Update,Patch 的主要优势在于客户端不必提供全量的 obj 对象信息。客户端只需以 patch 的方式提交要修改的字段信息,服务器端会将该 patch 数据应用到最新获取的obj中。省略了 Client 端获取、修改再提交全量 obj 的步骤,降低了数据被修改的风险,更大大减小了冲突概率。 由于 Patch 方法在传输效率及冲突概率上都占有绝对优势,目前 Kubernetes 中几乎所有更新操作都采用了 Patch 方法,我们在编写代码时也应该注意使用 Patch 方法。

ResourceVersion 字段在 Kubernetes 中除了用在上述并发控制机制外,还用在 Kubernetes 的 list-watch 机制中。Client 端的 list-watch 分为两个步骤,先 list 取回所有对象,再以增量的方式 watch 后续对象。Client 端在list取回所有对象后,将会把最新对象的 ResourceVersion 作为下一步 watch 操作的起点参数,也即 Kube-Apiserver 以收到的 ResourceVersion 为起始点返回后续数据,保证了 list-watch 中数据的连续性与完整性。