分布式系统的基石:深入浅出共识算法

649 阅读36分钟
原文链接: zhuanlan.zhihu.com

我们将从分布式存储系统面临的一致性问题开始进行讨论,进而比较详细地分析Raft和ZAB两种近年来最受关注的算法。

因为希望尽量涵盖算法的内容,所以文章很长,建议备好啤酒饮料

虽然已经很长,还是不够涵盖Paxos的内容 2333

软件系统面临很多问题

  • 硬件异常:硬盘错误,RAM错误,电源错误,网络错误。随着数据中心扩大,接入的硬件越来越多,硬件出现问题的可能性也越来越大 (1W件99.9999%可用的设备,组合在一起可用性会变为99%,如果是10W件,可用性只剩下90%)
  • 软件异常:即使经过长期实践检验,代码review非常严谨的软件,也难以摆脱bug。更糟糕的是,很多bug是难以通过测试发现的,就像硬件错误一样
  • 人为因素:运维人员的失误。人比起软硬件系统,更加不可靠。即使一个非常专业,也非常专注的运维工程师,也可能误操作,防止一个因失恋而心不在焉的工程师犯错就更加困难了

分布式系统的情况就更加糟糕

网络的本质是节点之间互相发送消息(光/电讯号),其物理本质是异步,单向且无反馈的。即使有TCP/IP协议,仍然要面对许多问题

  • 消息可能丢失
  • 消息可能被缓冲起来,增加了很多延迟
  • 目标节点不可用(可能down机,也可能由于某些原因暂时不能响应,如正在GC)
  • 消息可能被正常接收并处理,但是响应消息不能传达
  • 响应消息可能被延迟
图1

发送方甚至无法确定目标节点是不是完全不可用:

上图列举了(a), (b), (c)三种情况,client完全不能区分接收不到响应的原因

Replication and high availability

分布式系统通常通过replication,partition来提高availability和scalability。两者经常会结合使用

图2

Replication通过将同样的数据复制在不同节点,来达成:

  1. 使系统可以在部分节点异常的情况下继续工作(high availability)
  2. 将数据复制到距离用户更近的数据中心以提高访问速度(low latency)
  3. 拥有更多节点来直接对用户提供服务(scalability)

Partition通过将数据分组存储,来满足scalability

这篇文章我们只讨论high availability和replication,也就是consensus算法发挥作用的地方

quorum

quorum经常被用在replicated系统中,协助保证high availability

原理很简单,如果有n个replica,每个写操作必须被w个replica确认才算完成,则读操作必须读取r个replica:则只要 w + r > n,就能保证读操作可以读到最新的值,这个限制就构成了一个读写操作的quorum

更简单做法是要求w和r都 > n/2,这也是quorum经常被mojarity替代的原因

Consistency guarantees (Consistency models)

Eventual consistency

if no new updates are made to a given data item, eventually all accesses to that item will return the last updated value

解释一下这句话:如果某个数据项没有被更新,最终对该数据项的所有访问将返回它最后一次被更新的值

这是一个非常弱的guarantee。甚至你在同一进程里,对一个值进行写操作后立即读取它都可能读到写操作之前值(read your own write)。虽然Eventual consistency被很多系统用作广告(因为大家意识里它和高可用,快联系在一起),但实际上大部分宣称eventual consistent的系统都会提供更强一些的guarantee(至少是以可选的方式提供)

Linearizability

与之相反的,我们来看一看软件系统能提供的最强的guarantee,linearizability,也叫做atomic consistency, strong consistency, immediate consistency, external consistency

Linearizability并不是理论上最强的,更强的是Strict Consistency,但是Strict Consistency,至少在现代工程和物理的范畴内无法实现

用易理解的方式描述一下,linearizability要求系统表现的像只有一份数据replica,并且所有对数据的操作都是原子的。这样,在一次写操作执行完毕前,任何读操作都读不到新的值,而在写操作执行完成后,任何读操作都会读到新的值,直到另一次写操作完成

做比说难,下面的图说明了一个常用的主从结构,异步复制的replicas cluster下,client可能读取到延迟的数据(常见的mysql,redis集群都采用类似的机制),这样的集群都不满足linearizability

图3

上面的系统中,replica中的数据被用作读缓存,偶尔读到延迟的数据完全可以接收。但是,应用系统需要Linearizable的系统来协助解决问题,例如:

锁:把CAS操作看作获取锁(compare-and-swap。获取锁需要执行两个步骤,检查锁是否可以获取,然后更新锁的持有者,这种操作经常通过CAS只使用一个原子操作完成),锁需要一个严格的linearizable的实现

leader election:利用分布式锁实现leader election是一个常见的用法(例如kafka通过zk实现partition的leader election)

constraints:其中最广为使用的应当是uniqueness constraint,当并发的请求到来时,系统需要linearizable的存储才能判断哪一个请求能成功

解决race condition:我们在图3里展示了一类race condition,显然lineariable的系统可以避免这种情况

在分布式系统中实现Linearizable

单一系统实现linearizable并不复杂,因为操作系统提供了对内存/文件系统的原子操作

consistency models(strict consistency, sequential consistency, week consistency)本来是描述多进程操作系统的(多进程,共享CPU,内存,硬盘等资源),分布式系统面临同样的问题,所以使用了同样的术语

分布式系统可能有如下模型:

  • 单一leader:可能是linearizable的
  • multiple leaders:非linearizable,多主节点不可避免的会引入冲突,因而会表露出不止有一份数据的行为
  • leaderless:多半不是,因为采取leaderless的考量通常是为了能更灵活的平衡性能和一致性
  • consensus algorithms:后面我们将讨论如何基于consensus来实现linearizable

COST of Linearizability

  • 如果你的系统要求linearizability,一旦某些replica与其它replica断开,这些断开的replica就不能再处理请求,它们只能等待网络得以恢复,或者返回错误。不论采取哪种方案,它们都不再可用
  • 如果你的系统不需要linearizability,单个replica就可以独立的处理请求,这样哪怕它断开与其它replica的连接,也可以保持可用,但是整个系统的行为就不可能保持linearizable

这就是著名的CAP理论。不要求linearizability的系统可以提供更高的可用性和低于linearizable,但是高于eventual consistent的一致性。

注意,另一方面,CAP中availability的定义也非常严格,从系统中脱出的replica也要能独立处理请求。通常的数据系统不需要这样的可用性,从而可以考虑在降低一些可用性(例如在多数节点正常的情况下,这些节点可以工作)的情况提供linearizability

比可用性更重要的影响是性能

Linearizable的系统会牺牲很多性能,甚至多核处理器访问RAM都不是linearizable的(由于cache)

现代的数据系统通常会细致分析不同等级的Consistency model,使用低于linearizability的模型来优化性能

Consensus algorithms

分布式系统通过replication保证高可用性,但是我们看到要保证linearizable,多个replica需要保持同步。Consensus algorithm的作用就是让系统的多个节点达成一致。也就是说,通过consensus algorithm,我们可以提供一致,高可用的分布式存储(注意,高可用的定义与CAP不同,一致也不限于linearizability)

最经典的consensus algorithm就是2PC(2-phase-commit)和3PC

我们不细致讨论2PC和3PC,只列举一下它们的问题

2PC是blocking protocol,需要等所有参与者完成。这意味着必须等待最慢的参与者,效率非常低下。同时,依赖broker来协调,broker成为single point of failure

3PC对2PC做了一些改进,但仍然有问题。比如,在一些特例情况下,consensus不能保证

Paxos是正确性得以证明的最著名的consensus algorithm。Paxos不仅经过严谨证明和超过20年的研究,还以其复杂性和难以理解著称。考虑到篇幅的关系,本文不准备对Paxos做很多介绍

本文后续将以Raft和ZAB为实例,对consensus进行分析介绍

RAFT

Raft是当今最受关注的consensus algorithm,它最重要的特点是易于理解和实现(对飙Paxos) Raft采用leader-follower机制,通过replicated log在节点之间同步数据

Leader负责维护replicated log,接收client的请求,并将log复制到其它节点。Leader可能异常(出错或断开),在此情况下将选举出新leader

这样的机制大大简化系统设计,因为leader且只有leader能修改log,写操作就可以变成简单的在log末尾加入新行(Write Ahead Log)。数据同步的方向也变为简单的leader --> follower

这样,RAFT就把consensus的问题就分为三个相对独立,更小更细致的问题:

Leader election:系统启动时,或leader异常时,要选举出一个且只有一个新leader

log replication:leader处理client的请求,并将之转化为log entry,并将log复制到整个集群。其它节点的log都要遵从leader

safety:safety的含义是系统不会给出错误的结果。例如,进程将key a的值设置为1,在没有其它写操作执行前,读取a应该返回1。对于replicated log来说,就是在任意节点上,index相同的log entry应该一致

Raft基础

Raft使用replicated state machine作为基础数据结构,通过replicated log在节点之间同步数据。log的条目(entry)会包含state machine可执行的命令。收到命令的节点在本地state machine执行命令。命令可能如下:set key a to 'a'; update key a to 'b'; delete key a; set key a to 'c' if value(a) = 'b' (写操作,CAS操作……)

Raft集群使用quorum来进行写操作和leader election(注意与我们之前提到的读和写的quorum略有不同,w + e > n 保证选举出的新leader持有最全的log entries)。每个服务器可能处在3种状态:leader,follower,candidate(leader候选)。正常运行时,集群有一个leader,其它节点都是follower。Follower不会主动向系统的其它部分发送消息,只会接收并响应leader和candidate的消息

图5 RAFT node state
图6

RAFT按term划分时间。每一个term都从election开始,到下次election开始前结束。election期间可以有1个或多个candidates尝试成为leader,raft保证每次election只会选出0个或1个leader。如果没有选出leader,term立即结束,开始下一个term

可以看出,term可以被用作逻辑时钟,Raft也确实给term赋予单向增长的term number,作为系统逻辑时钟的一部分

Raft并没有限制服务器间的通讯模式,只要求通讯满足RPC模式(request-response)。Raft主要使用两种RPC:

RequestVote:由candidate在election期间发起
AppendEntries RPC:由leader发起

使用RPC方式通讯也是Raft简化系统的方式。每个请求都要求接收方给出响应。Raft允许消息丢失,请求方在这种情况下需要重试,服务器可以并行发送请求,Raft不要求网络保持RPC消息的顺序

可以看到,Raft对网络的要求非常宽松,不要求必须送达,也不要求保持顺序。这与ZAB形成了一个鲜明的对比。如果读者有了解CvRDT和CmRDT,可以看到这里有相当程度的相似性。这种相似度不是偶然,我会尝试找或写一篇文章分析这种相似的原因

Leader election

Raft使用心跳机制触发leader election。服务器启动后都处在follower状态。如果follower能收到leader或candidate的消息,它就会维持在follower状态。

Leader会定时发送心跳消息给所有follower。如果follower经过一定时间没有收到消息,它就会将自身转化为candidate,启动election(增加当前term number,启动新term。注意,其它follower可能用同样的方式加入这个term,成为term的candidate)

Candidate会投票给自己,然后通过RPC向集群的其它所有服务器发送RequestVote RPC。投票可能有3种结果:

  • Candidate自身获选(超过半数票)
  • 另一个Candidate获选
  • 超时,没有获选者

每个服务器在一个term只能投票给一个candidate,采用先到先得的方式(candidate会投自己),这样,超半数票的要求就限制了一次election只可能选出一个leader。一旦选出新leader,新leader就开始向其它服务器发送心跳,防止新election发生

在等待选举的时候,candidate可能收到AppendEntries RPC,发送方可能是之前断开的leader,也可能是收到多数票的新leader。如果发送方的term number和该candidate一样或更大(更大的原因是选举timeout,又启动了新term),如果小于自身的term number,则candidate拒绝请求,继续等待选举结果

如果同时有多个follower成为candidate,选票可能分散,没有candidate能得到超半数票。Raft会指定一个election timeout,超过这个时间candidate会继续增加term number,启动新选举。新发出的RequestVote会带有更大的term number。

为避免选举得不到结果的情况反复出现,Raft使用一个固定区间内的随机election timeout(好比在一个平均通讯时间10ms的网络里,election timeout在150ms-300ms之间)。这样,大部分情况下只有一个candidate会timeout,并迅速启动/完成新一轮election。

Raft最初考虑采用排榜的方式处理多个candidate分选票的情况。亦即给每个服务器分配一个不同的排名,如果一个candidate收到更高排名的RequestVote,它会放弃candidate身份,回到follower。但是这种方式在某些情况下变得很复杂。Raft最终选择随机超时重试的方式,因为这种方式更易于理解

Log replication

Leader被选出后,就开始处理客户端请求。客户端请求会包含需要服务器执行的命令。Leader将这个命令加入自己日志末尾(新条目会记录当前的term number,以及它在当前term的序号),然后发送AppendEntries RPC给其它服务器。

Leader决定何时log entry可以算作已提交(committed)。Raft保证已提交的日志不会丢失,且最终会复制到所有节点。提交一条日志也意味着这条日志之前的日志都已提交。下面继续说明

  • 如果不同日志(不同服务器上)上的两条记录属于相同的term,且在该term中序号一致,则它们包含相同的内容
  • 如果不同日志里的两条记录term和序号都一致,则它们之前的日志也一致

第一条:因为日志都是由leader产生的,而一个term内只会有一个leader,这就保证了term number+日志序号可以唯一的决定一条日志。leader的日志是WAL,所以顺序不会变化。

第二条:AppendEntries会进行一致性检查。AppendEntries RPC会带有当前term number和上一条日志的序号,如果follower找不到上一条日志,就会拒绝这个请求。这样,一旦follower接收AppendEntries RPC,leader就可以确认该follower的日志与leader完全一致

在Leader异常的情况下,可能出现日志不一致的情况,图7列举了这类情况

图7

图中颜色代表term,数字为term number。follower可能:

  • 缺少entry(a,b)
  • 包含多余的未commit的entry(c,d)
  • 同时缺少entry和包含多余的未commit的entry(e,f)
注,出现不一致的地方都是未commit的entry

为了解决这些不一致的情况,leader需要找到每一个follower和自己log开始不一致的点,删除follower上不一致的log entries,再将自己的log发送给follower

Leader会保有每一个follower上日志相对于自己的位置。这一动作在leader确立后就通过AppendEntries RPC完成。

leader首先根据自己log最后的位置发出AppendEntries RPC,如果follower拒绝(找不到上一个log),leader会将follower的日志位置向后移动一位,再继续请求,直到follower接受请求为止。Follower一旦发现AppendEntries RPC里包含的位置本地也存在,就会删除掉本地多余的entries,接受AppendEntries RPC
Leader可以发送空的AppendEntries来节省带宽,直到确认follower上日志位置后再发送数据
Follower也可以在响应中加入本地日志的信息,避免Leader一位一位的尝试
实践中,Raft回避了这些优化。因为这些不一致的情况都不易出现,这些优化带来的复杂度很大,但性能上的收益非常小
可以看到,这个机制非常简单,Raft认为大部分情况下按照此机制实现就足够现实需求。当然,也可以按需进一步优化

AppendEntries RPC的机制简化了保持日志同步的麻烦。Leader只需正常的尝试AppendEntries RPC,所有服务器的日志就会很快保持一致。

与follower不同,leader不会覆盖或删除自己的日志(WAL)。这样可以确保已经一致的follower只会落后,不会再变得不一致

Safety

Raft还需要一定的机制来保证每次leader election选出的新leader一定要包含最近的committed log。这样,committed log才不会被改写

任何consensus algorithm都要求leader最终保有全部committed log entries。但有些算法允许选举出缺少committed entries的leader,然后再将缺少的部分同步给leader。

Raft要求只有包含上一个term全部committed entries的follower才能被选为新leader,这就避免了很多复杂度

这一机制包含在election的过程中。一个candidate必须联系过半数节点,这些节点中必然有至少一个包含全部committed log entry。RequestVote RPC会包含candidate上最后一条log的信息,接收的节点如果发现自身的日志更up-to-date,就会拒绝RequestVote。因为必然有过半数节点包含最近的committed log,所以如果一个candidate没有最近的committed log,就不会比这些节点更up-to-date,也就不能获选。

up-to-date通过比较最近term里最后一个log entry的term number和序号来判断。如果最后term number不同,则term number比较大的更up-to-date;如果term number相当,则log entry序号更大的更up-to-date

注意:虽然在一个term内,可以通过对AppendEntries RPC计数来判断该term内产生的log entry是否已commit,这样的方法不能用在commit上一个term的entry里,这种情况可以参见图8

图8

a)S1是leader,正在复制entry 2 (term 1)

b)S1异常,S5被选举为新leader,添加了entry 3 (term 2)

c)S5异常,S1恢复并被选举为新leader,开始继续复制entry 2。(term 3)注意S3虽然复制到了entry 2,它最后的entry仍然在term 1

d1)S1又异常。即使entry 2被复制到多数节点,S5仍然可能被选为新leader,因为S2/S3会认为自己的日志比较落后,这样就会走向d1的情况,被多数节点复制的entry 2最终被抹掉

d2)如果S1没有异常,entry 4在term 3中被复制到多数节点(committed),entry 2也被commit。因为AppendEntries的机制,commit一个entry时,leader之前的所有entry都会被commit

这里有一些额外的复杂度,根源在于Raft会保持每个log entry的term number。一些其它的算法可能会给从前一个leader带来的log entry提供新的term number(类比Raft的术语)。Raft希望保持log的结构更容易理解

Follower & Candidate异常

很容易处理,Raft会无限重试RPC

Raft RPC是幂等的,所以没有错误会因此产生

Log compaction

随着运行时间增加,会有越来越多操作被记录到日志中。需要某种方案来压缩日志

压缩日志的思路是将不需要的日志删除。例如如果日志中出现 set x to 2,那么之前的set x to 1就可以删除。

Raft压缩日志需要各个节点的state machine配合完成,而Raft并没有规定state machine的具体实现。所以Raft只给出了日志压缩的建议

我们不细致讨论这些方案,只大致列举:

  • 使用Snapshot。将当前的整个系统状态写入稳定的存储,然后删除之前的所有log
  • 如果state machine是持久化的,系统状态本身就写在硬盘上。Raft log可以在被执行完后立即丢弃。
  • 使用log cleanning,log-structured merge tree等数据结构

Raft建议日志压缩时采用一些核心思路:

压缩日志由各个节点独立完成,不通过leader统一做决定。这样可以避免日志压缩受到Raft的算法影响,增加更多复杂度。(在很小的state machine的情况下,由leader完成压缩再发送给follower可能更好)

压缩日志后,节点还需要保存一些信息,不能影响到AppendEntries RPC

因为节点自主压缩了日志,节点可能需要将状态存储起来以备重启时使用。同时,还要考虑节点长时间断开,日志落后很多时的情况(可能需要通过leader发送snapshot来跟上)

Membership changes

动态添加/删除节点

本文不细致讨论Raft具体如何支持membership changes。只概括一下思路

添加/删除节点时,系统需要顾及到safety(不要出现错误的响应)和availability

下文中,我们把节点的状况也称为配置(configuration),添加/删除节点的操作就包括提供服务器,和使用新配置Cnew替换旧配置Cold

先介绍常见的做法:

系统首先切换到一个临时状态,leader将临时状态提交到集群,然后切换到新状态。临时状态是Cold和Cnew的组合(同时要求两种多数票)

此时,新的log entry只有同时复制到满足Cold和Cnew的多数节点才算committed

现在,系统的所有操作需要同时满足Cold和Cnew。leader继续提交Cnew状态给集群,提交完成后,Cold配置对系统已无关紧要

Raft在实现此方案的过程中,发现了更简单,不依赖临时状态的方案,每次更改配置时,只能增加或删除一个节点。更复杂的配置变更可以通过一系列增/删单个节点的操作完成

Raft比较推荐使用后一方案,实现会简化,操作复杂度和效率都不会有明显降低

只增/删一个节点不需要中间状态的根源在于:只有一个节点变化的情况下,新旧配置的多数节点必然会有重合。新配置生效后,旧配置自然无法无法再形成多数票

添加/删除节点还意味着需要rebalance clients。我们也不细致分析,只提示存在此问题

Raft Client

发现raft服务

由于raft集群可能动态变换leader,member,通过简单的静态配置就不太可行。可以通过使用外部目录服务(如DNS),当然,需要随着系统状态的变化更新目录服务数据

routing to leader

Raft是leader based,所以client的请求需要转发给leader。Client启动时会随机连接到一台服务器上。之后,可以由该服务器返回leader地址给client,client重连到leader,也可以由服务器向leader转发client的请求

无论那种情况,raft都需要避免服务器上leader信息过期导致client请求被无限延迟。而不论leader,follower还是client,都可能存储过期的leader信息

leader:服务器可能处在leader状态下,但不是集群当前的leader。这样,该服务器即无法处理client请求,也不能返回正确的leader地址给client。这种情况下,raft通过election timeout进行控制,如果leader在一个election timeout内无法完成集群多数节点的heartbeat,它就会退出leader状态。client可以连接到其它服务器进行重试

follower:follower无法连接到leader时会启动新的election,在新leader选出后follower会更新leader信息。

Client:Client失去连接后(可能是leader退出leader状态,follower拒绝响应,等等情况),会随机尝试另一服务器,而不是尝试重连

linearizable

client和服务器间的通讯是at-least-once模式。client会在请求没有得到响应的情况下重试,但是我们之前看到,网络环境下没有响应的原因是无法判断的。重试的请求可能已经被正常处理过。这一问题通过session来解决。Client会为session内的请求进行序列编号,session会跟踪最近处理过的序列号,如果请求已经被处理过,服务器会跳过执行,直接返回结果

client会在每一个请求里告知服务器它尚未接到响应的请求中最小的编号。服务器由此得知哪些请求已经响应成功不会再被重试,可以在session中清理掉
注意:session会在每一个服务器上为每一个client创建。虽然请求会通过leader写入log,但是每一个follower会单独对本地的state machine进行处理

服务器总是需要session过期的问题,这里又出现了需要服务器达成一致的情况,如果过期的时间不一致,有些服务器可能会再次处理重复的消息,导致系统状态不一致。可以有各种做法:

LRU policy:每个服务器只维护一定数量的session,数量达到就开始移除
根据过期时间:注意服务器不能单独按本地时间去过期session,这一策略需要根据leader通过log entry传递的时间来实现

在client session过期时,可能出edge case:client在session过期后继续发送的请求。如果给client分配新session,会无法处理重复请求的问题。Raft的参考实现LogCabin对这种情况会直接返回错误,client进而直接crush。

可以看到,这种机制本身不支持无限scale client。

read-only requests

如果只读请求可以不通过raft log(无需consensus),就可以非常快速的处理。

但是,如果绕过raft log,可能会导致读取过期数据。

有一些手段可以在保持linearizable的情况下绕过raft log:

  1. leader会保有自身log中committed部分的序号。如果只读请求到来时,leader尚未确定提交任何一个entry(刚刚成为leader),就等到有entry提交。一旦有entry提交完成,raft就保证了leader上提交的entry和任意一个follower一样up to date。
  2. leader把这个提交过的entry的序号作为readIndex。发送到leader的只读请求可以从这个index的位置构建出的状态读取
  3. leader还不能直接使用readindex来响应只读请求,因为还需要明确它没有被新的leader替代。这个检查通过一轮心跳检查完成(一轮心跳可以为多个readonly request服务)
  4. leader等待自身的状态机执行完readIndex,然后根据状态机响应只读请求

follower可以通过像leader请求当前的readIndex,然后执行第4步来响应只读请求

还有一些更近一步的手段可以优化只读的请求,我们这里不再细致讨论。思路是一致的,只要处理请求的服务器能够确定最新的committed index,就可以根据执行到这一步的状态机返回结果而不破话linearizability

Total Order broadcast (Atomic broadcast)

total order broadcast也被归为consensus algorithms,虽然zookeeper官方有强调ZAB不是consensus algorithm。下面将对其简要介绍,并最终介绍ZAB算法(Zookeeper atomic broadcast)

Ordering,Causality

再次以图3展示的简单情况来讨论

图3,再展示一次

尽管网络消息本质上是无序/混乱的,各个参与者的操作和系统事件之间是有一定因果关系的

图中,Follower 1必须在收到leader的消息后才能把新结果的消息发给Alice,Referee也只有发送消息到leader后才可能收到更新成功的ok消息

Alice和Bob看到不同结果,是因果关系被破坏的一个反映(其原因是cross channel,亦即Alice和Bob之间有两个不同的消息渠道--一个通过系统,一个通过电话。本文不做细致讨论)

Causality隐含了消息/事件的顺序关系:因在果前;消息发送在接收前;响应发生在请求后。系统中有因果关系的事件会形成链条:某个节点读取数据,进行一些计算,然后把结果保存下来;另一个节点读取并根据这些结果写入一些其它的结果。这些有因果关系的操作就定义了系统中发生的事件的因果关系

保证消息的处理遵循这些因果关系的系统,就是causally consistent的系统。这是一个比Linearizable弱的consistency model,但是又比eventual consistency强。

我们之前从数据存储的角度讨论了linearizable的系统,现在来看一看Ordering(对系统事件的排序)和consistency model的关系

Linearizability
在linearizable系统内,所有操作可以有一个统一的排序:既然系统的行为像是只有一个数据replica,所有的操作又是原子的,那样任何两个作用在这个唯一replica上的操作总是有先后顺序的
-- 比较容易可以看出,Linearizability实际上隐含了Causally consistent。
Causality
Causally consistent的系统内不要求所有操作都有先后顺序,有些操作没有因果关系,因而无法比较先后顺序,没有因果关系的操作/事件就叫做并发操作/事件
我们已经看到为实现linearizability,Raft需要很多网络通讯。事实上,Causally consistent是一个分布式系统在不牺牲性能的情况下能实现的最强consistency guarantee(CAP不再适用,因为我们放弃了linearizability)。本文对此不做进一步分析

捕获Causality

我们需要一些手段来记录系统中事件的因果关系,来识别有因果关系的操作和并发操作。很容易想象,可以用时间戳来记录事件的先后顺序。

很遗憾,大部分情况下这是不可行的。操作系统的时间并不可靠,总是会在一段时间后偏离正确的时间。所以我们有许多时间服务器供世界各地的操作系统随时进行同步。但是系统总会在某些时段返回给应用程序错乱的时间。VM/Container的广泛应用带来更多时间上的问题,VM可能会在某些时候整个被停住,来调度硬件资源

分布式系统通常会适用逻辑时钟来代替物理时钟。我们常用的乐观锁机制就是在数据里加入只增长的版本号,写操作会先读取当前版本号,确保与之前读取的版本号一致才完成写入,如果当前版本号高于写操作之前的版本,就发现了并发操作(为了避免引入事务,nosql数据库通常会提供CAS-compare and swap-操作,可以将对比版本和写入算作一个原子操作)

更泛化的逻辑时钟包括lamport timestamp,vector clock,它们都可以用来识别并发操作,以及为因果顺序的操作排序。我们这里不详细讨论

Total order broadcast (Atomic broadcast)

顾名思义,total order broadcast包括两部分,一是要对系统内的操作进行全局顺序编排,二是要将操作按编排好的顺序发送到系统的各个节点

Reliable delivery
No messages are lost: if a message is delivered to one node, it is delivered to all nodes.
Totally ordered delivery
Messages are delivered to every node in the same order.

由于这样的特性,total order broadcast可以被用来实现linearizable存储

写操作可以被视为添加一个消息,系统的每个节点都会按同样的顺序收到所有的写操作。读操作也将被视为添加一个消息,执行读操作的节点将在自身收到这个读操作消息后再返回响应,这样,系统中任何节点都不会返回过期的数据

我们前面讨论过的unique constraint,leader election等也可以通过类似的方式实现,例如:Unique constraint可以通过先添加一个写操作消息,然后等待这个消息返回到执行节点,因为所有在它之前的消息都会被发送过来,执行的节点就可以判断新写入的值是否已经被占用。如果被占用,就对客户端返回失败,如果没有,就成功写入(注意,这些操作体现了linearizability影响性能)

反过来,linearizable存储也可以用来实现total order broadcast:

将linearizable存储的每一次数据变化都顺序生成一个版本号,然后将变化内容和版本号一起发送给系统的所有节点。注意这里版本号必须是连续的,这样接收的节点才知道有时需要等待空缺的版本送达

ZAB (Zookeeper atomic broadcast)

ZAB也是leader-based consensus algorithm,所以与Raft有很多相似点。ZAB允许没有最近提交日志的节点被选为leader,在election完成后再对日志的不一致进行修订(具体算法是可替换的)。

ZAB提供比Raft强一些的guarantee:client可以pipeline一些列请求,ZAB保证这些请求会被顺序执行(或者全部不执行)。这可以用来:申请锁-执行一些列操作-释放锁。其它客户端可以按此顺序观察到执行结果。(Raft同样可以用来提供这样的机制,但需要额外的工作处理客户端的重试)

我们看到了total order broadcast可以用来实现linearizable存储,而linearizable存储可以用作分布式锁,leader election(distribute coordinator)等等功能,这正是Zookeeper的目标

那么ZK是如何实现total order broadcast(atomic broadcast)的呢?

通过它的linearizable存储。

当然这是玩笑,ZK需要的是用atomic broadcast来实现它的linearizable存储。

ZAB是leader-based,client可以连接到一个或多个节点。节点会将client的请求转发到leader(如果只读会直接处理,通过sync()操作保证linearizable),leader执行请求,然后将状态变化发送到followers

注意这里,leader会直接执行未commit的操作,然后发送状态变化给follower,follower直接应用状态变化,不会执行操作(根本就没不知道操作是什么)
而Raft会先将操作发送给follower,待commit之后再执行到本地状态,follower各自执行操作
这两种机制分别称为replicated state machine和primary backup。(就像event sourcing中的command,event)各有利弊,但是显然,ZAB无法用Raft类似的机制,根据readIndex直接提供linearizable的只读响应

Raft将自身实现为replicated log,ZAB的机制相似,但是使用了不同术语描述。Raft的term在ZAB中被称为epoch,leader上每一次执行操作导致的状态变化被称为transaction(记得ZK可以pipeline请求),类似Raft的log entry, transaction也有一个id被称为zxid,其值与Raft相同,epoch序列号和epoch内的transaction序列号

概览

ZK通过TCP协议传输消息,这样就不需要自己处理一些网络底层的问题,ZK同样通过heartbeat来检测错误的节点

准确的说,TCP协议帮助ZK实现两个目标:

一个FIFO channel,传递的消息可以保证顺序
FIFO channel关闭后不会再收到消息

Raft没有要求使用TCP通道,完全可以有UDP方式的实现。但是,保证消息传递顺序的通道对Raft性能的优化至关重要(AppendEntries会顺序到达,不需要无止境的重试,等待……)

ZK集群存在两个生命周期的阶段:

  1. leader activation 选举出一个leader,并且保证leader上正确记录系统当前的状态
  2. active messaging leader开始接收并处理请求

Leader Activation

当新leader被选出后,其它服务器将连接到新leader,并开始接收自己丢失的请求。如果一个服务器缺少太多请求(断线时间太久,或者刚刚加入集群),leader会把当前的整个数据状态作为一个snapshot发送给这个服务器(可以看到ZK总共能存储的数据是受限的,并没有引入类似kafka broker的partition机制)

新leader根据自己最大的zxid来确定下一个zxid,并用这个zxid发送NEW_LEADER请求给其它服务器,这个NEW_LEADER请求被集群接受后leader activation才算作完成

Active messaging

一旦leader被选出,处理请求的过程就比较简单

ZK默认使用majority quorums,亦即3个节点中的2个,4个节点中的3个(所以通常使用奇数节点),但也支持一些奇怪的用法,例如,如果你有5台服务器,其中3台性能远强于另外两台,可以给这3台分为一组,赋予更高的优先级(一票算两票)

ZAB的4个阶段

ZK的两个阶段是一个比较粗糙的划分,集群的节点实际上可能处在4个阶段

  • election (phase 0)
  • discovery (phase 1)
  • synchronization (phase 2)
  • broadcast (phase 3)

当多数节点处在前三个阶段时,ZK集群也就处在leader activation阶段。和Raft一样,此阶段集群不会处理请求,主要处理新leader的选择和保证已提交的数据都转移到新leader上

节点可能有三种状态:following,leading,election

Phase 0: Leader election 节点在此阶段处在election状态。具体选择算法可以变化(后续介绍默认算法FLE),每个投票的节点只要选出一个可能得到多数票的leader。被选的节点称为投票节点的prospective leader。phase 3将在prospective leader中确认leader。若节点投票给自己,则进入leading状态,否则进入following

Phase 1: Discovery Follower创建到prospective leader的follower-leader连接,并发送自身最近的epoch和transaction信息,prospective leader确认follower接收过的最近的transaction,找到具备最近transaction的节点(与raft判断up-to-date的方式一致,先比较epoch,epoch相同再比较zxid),将此节点的历史记录同步到本地,并据此创建新的epoch。如果follower尝试连接following状态的节点(没有给自己投票的节点),连接将被拒绝,follower退回Phase 0

注意Raft采用了特定的的投票机制,每个节点投票给自己,且在Phase 0就确认被选的leader必须具备most up-to-date的log entry,从而避免了这一步

Phase 2: Synchronization leader将本地历史记录的transaction发送给follower,一旦过半数follower确认接收,prospective leader发送提交消息给这些follower,此时,prospective leader被确认为leader

Phase 3: Broadcast 此阶段开始正常处理请求。

图4

很像经典的2-phase commit,但是无需处理abort的情况

注意所有的通讯都通过FIFO channel(TCP connection),如果连接断开,重连时会根据zxid在断开的点上继续

  • leader向所有follower发送propose的顺序一致,也与leader自身接收请求的顺序一致
  • follower处理propose的顺序与接收的顺序一致,因而返回ACK给leader的顺序也与propose顺序一致(act like no concurrency)
  • 一旦多数ACK被leader收到,leader会发送COMMIT给全部follower。COMMIT的顺序与ACK,propose的顺序都一致
  • COMMIT是通讯的重要部分,也会按顺序执行

在Phase 3,follower可以继续加入该epoch(因为phase 2只需要quorum即可进入phase 3,有些节点可能会被留下)

与Raft相同,如果leader不能收到quorum的heartbeat,或者follower不能收到leader的heartbeat,就会回到phase 0

Fast Leader Election

FLE尝试选出具备全部committed transaction的节点,这样就可以省去后续Phase 1的工作

思路大致是在所有节点之间建立联系,互相通知每个节点的投票信息。每个节点初始都投给自己,再收到其它节点的投票信息后,查看该投票的对象是否可能比自己有更近的committed transaction,然后根据这个情况更新自己的投票。具体算法比较复杂,这里不再细述

FLE达成的结果和Raft election一致

参考资料:

Design data intensive applications dataintensive.net/

Consensus: Bridging theory and practice

ZooKeeper's atomic broadcast protocol: theory and practice www.tcs.hut.fi/Studies/T-7…

Consul: consule.io

Zookeeper: zookeeper.apache.org

etcd: coreos.com/etcd/