阅读 918

Zookeeper三万字笔记

ZK简介

zk是什么

zookeeper是一个高性能、开源的分布式应用协调服务,提供了简单原始的功能,分布式应用可以基于它实现更高级的服务,比如实现同步(分布式锁)、配置管理、集群管理。它被设计为易于变成,使用文件系统目录树作为数据模型。服务端使用Java语言编写,并且提供了Java和C语言的客户端。

zk的数据模型

zk是一种分层的树形结构

  1. 树形结构中每个节点称为Znode
  2. 每个Znode都可以有数据(byte[]类型),也可以有子节点
  3. Znode的路径使用斜线分割,例如/Zoo/Duck,zk中没有相对路径的说法,即所有节点的路径都要携程绝对路径的方式。
  4. 当zk中节点的数据发生变化时,版本号会递增。
  5. 可以对Znode中的数据进行读写操作。

zk的应用场景

数据发布/订阅

数据发布/订阅即所谓的配置中心:发布者将数据发布到zk的一个或一些列节点上,订阅者进行数据订阅,可以即时得到数据的变化通知。

应用A将数据发布到zkServer的某个节点上,应用B和C会先在zkServer上注册监听该节点的watcher(相当于listener,基于RPC实现),一旦该节点有数据变化,B和C上的watcher变化得到通知,继而从zkServer上获取最新的数据。

负载均衡

zk实现负载均衡本质上是利用zk的配置管理功能,实现负载均衡的步骤:

  1. 服务提供者把自己的域名及IP端口映射注册到zk中。
  2. 服务消费者通过域名从zk中获取到对应的IP及端口,这里的IP及端口可能有多个,只是获取其中一个。
  3. 当服务提供者宕机时,对应的域名与IP的对应就会减少一个映射。
  4. 阿里的dubbo服务框架就是基于zk实现服务路由和负载。

分布式协调/通知

通过zk的watcher和通知机制实现分布式锁和分布式事务

集群管理

获取当前集群中机器的数量、集群中机器的运行状态、集群中节点的上下线操作、集群节点的统一配置等。

基本概念

集群角色

  1. Leader:为客户端提供读写服务。
  2. Follower:为客户端提供读服务,客户端到Follower的写请求会转交给Leader角色,Follower会参与Leader的选举。
  3. Observer:为客户端提供读服务,不参与Leader的选举,一般是为了增强zk集群的读请求并发能力。

会话

  1. session是客户端与zk服务端之间建立的长连接。
  2. zk在一个会话中进行心跳检测来感知客户端链接的存活。
  3. zk客户端在一个会话中接收来自服务端的watch事件通知。
  4. zk可以给会话设置超时时间。

zk的数据节点

1、Znode是zk树形结构中的数据节点,用于存储数据 2、Znode分为持久节点和临时节点两种类型: 持久节点:一旦创建,除非主动调用删除操作,否则一直存储在zk上 临时节点:与客户端会话绑定,一旦客户端失效,这个客户端创建的所有临时节点都会被删除 3、可以为持久节点或临时节点设置Sequential属性,如果设置该属性则会自动在该节点名称后追加一个整形数字。

zk中的版本

zk中由三种类型的版本 1、version:代表当前Znode的版本 2、Cversion:代表当前Znode的子节点的版本,子节点发生变化时会增加该版本号的值 3、Aversion:代表当前Znode的ACL(访问控制)的版本,修改节点的访问控制权限会增加该版本号的值。

zk的watcher

watcher监听在Znode的节点上,当节点的数据更新或子节点的状态发生变化都会使客户端的watcher得到通知

zk中的ACL(访问控制)

类似Linux/Unix的权限控制,有以下几种控制访问权限: 1、CREATE:创建子节点的权限 2、DELETE:删除子节点的权限 3、READ:获取节点数据和子节点列表的权限 4、WRITE:更新节点数据的权限 5、ADMIN:设置节点ACL的权限

zk的基本操作命令

1、帮助命令 help 2、ls path [watch] 其中path指定数据节点的路径,加上watch参数表示监听path路径下所有子节点的变化。 ls命令的作用是列出指定节点下的所有子节点,ls只能查看第一级的所有子节点。

3、create [-s] [-e] path data acl命令 该命令的作用是创建zk节点,-s代表创建的节点具有顺序的属性,-e表示创建的是临时节点,默认情况下创建的是持久节点,path是节点的全路径,data为创建节点中的数据,acl用来进行权限控制, 默认情况下不做任何权限控制。

4、get path [watch]命令 获取path节点的数据内容和属性信息,watch选项作用同ls命令

5、set path data [version]命令 该命令的作用是更新path路径节点的数据内容,data为更新的数据,version为指定数据被更新的版本,如果version比当前的dataVersion还小会报错。

6、delete path [version]命令 删除路径为path的节点,version指定被删除数据的版本,一般不指定,表示删除最新的数据版本,若version为旧的版本则会报错。

Znode的访问控制

ACL全称是Access Control List,访问控制列表,zk中ACL由三部分组成,即Scheme:id:permission。其中: 1、scheme是验证过程中使用的检验策略 2、id是权限被赋予的对象,比如ip或某个用户 3、permission为可以操作的权限,有五个值:crdwa,分别表示create/read/delete/write/admin,具体含义前面描述过。 通过setAcl path acl命令可以设置节点的访问权限,path是节点路径,acl是要设置的权限。通过getAcl path可以查看节点的权限信息。需要注意节点的acl不具有继承关系。 权限检验策略即scheme有五种类型:world/auth/digest/IP/super,下面一一描述:

world检验策略

ACL格式: world:anyone:permission 检测策略为world则id固定位为anyone,如果permission为crdwa则表示任何用户都有创建子节点、读取节点数据、删除子节点、更新子节点数据和设置节点ACL权限的权限。 创建新节点的ACL默认为:world:anyone:crdwa

auth检验策略

ACl格式:auth:id:permission 比如auth:username:password:crdwa,auth检验策略表示给认证通过的所有用户设置acl权限。 可以通过addauth digest :命令添加用户。如果通过addauth创建多组用户和密码,当使用setAcl修改权限时,所有的用户和密码的权限都会跟着修改, 通过addauth新创建的用户和密码组需要重新调用setAcl才会加入到权限组当中去。

总结起来就是:auth检验策略下setAcl会设置当前会话所有拥有的所有用户访问节点的权限,当前会话新添加的用户需要重新设置节点的ACL权限才会把新用户对节点的操作权限加上去。

digest检验策略

ACL格式:digest:id:permission digest和auth类似,只不过digest格式中的id需要使用sha1进行加密,zk已经为我们提供了相关加密的类。如下是为id加密的代码:

public class DigestTest {
    public static void main(String[] args) {
        try {
            System.out.println(DigestAuthenticationProvider.generateDigest("acluser1:111111"));
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}
复制代码

示例:

=========================客户端1上的操作======================
[zk: localhost:2181(CONNECTED) 86] create /acltest acltestdata  // 添加测试节点
Node already exists: /acltest
// 添加三个用户
[zk: localhost:2181(CONNECTED) 87] addauth digest acluser1:111111
[zk: localhost:2181(CONNECTED) 88] addauth digest acluser2:222222
[zk: localhost:2181(CONNECTED) 89] addauth digest acluser3:333333
// 设置digest类型的acl权限
[zk: localhost:2181(CONNECTED) 90] setAcl /acltest digest:acluser1:hHUVzmra9P/TbXlP/4jRhG9jZm8=:crdwa
cZxid = 0x7c
ctime = Fri Sep 14 18:42:17 PDT 2018
mZxid = 0x7c
mtime = Fri Sep 14 18:42:17 PDT 2018
pZxid = 0x7c
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 11
numChildren = 0
// 发现只有acluser1用户具有对/acltest节点的crdwa权限
[zk: localhost:2181(CONNECTED) 91] getAcl /acltest
'digest,'acluser1:hHUVzmra9P/TbXlP/4jRhG9jZm8=
: cdrwa

======================客户端2上的操作=======================
// 只添加acluser2用户,发现没有权限读取/acltest节点
[zk: localhost:2181(CONNECTED) 14] addauth digest acluser2:222222
[zk: localhost:2181(CONNECTED) 15] get /acltest
Authentication is not valid : /acltest
// 添加acluser1后,能够读取/acltest节点
[zk: localhost:2181(CONNECTED) 16] addauth digest acluser1:111111
[zk: localhost:2181(CONNECTED) 17] get /acltest                  
acltestdata
cZxid = 0x7c
ctime = Fri Sep 14 18:42:17 PDT 2018
mZxid = 0x7c
mtime = Fri Sep 14 18:42:17 PDT 2018
pZxid = 0x7c
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 11
numChildren = 0

复制代码

小结:调用setAcl时不像auth那样会设置当前会话中所有用户访问节点的权限,只会设置指定的单个用户对节点的访问权限。setAcl设置中id需要使用sha1进行加密。

IP检验策略

ACL格式:ip:id:permission id是ip地址,指定某个ip地址可以访问 示例:

[zk: localhost:2181(CONNECTED) 96] create -e /acltest acltestdata
Created /acltest
// 设置只有192.168.1.1这台机器可以访问
[zk: localhost:2181(CONNECTED) 97] setAcl /acltest ip:192.168.1.1:crdwa
cZxid = 0x85
ctime = Fri Sep 14 18:59:23 PDT 2018
mZxid = 0x85
mtime = Fri Sep 14 18:59:23 PDT 2018
pZxid = 0x85
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x100063d34d5000b
dataLength = 11
numChildren = 0
// 本机地址为127.0.0.1,无法访问
[zk: localhost:2181(CONNECTED) 98] get /acltest
Authentication is not valid : /acltest
复制代码

super检验策略

主要提供给运维人员维护节点使用,acl中执行的用户具有任何操作任何节点的权限,启动时需要在启动脚本配置如下参数:

-Dzookeeper.DigestAuthenticationProvider.superDigest=admin:015uTByzA4zSglcmseJsxTo7n3c=
复制代码

watcher的使用和原理

watcher解决的问题

应用服务器集群可能存在两个问题: 1、因为集群中有很多机器,当某个通用的配置发生变化后,怎么自动让所有服务器的配置同一生效? 2、当集群中某个节点宕机,如何让集群中的其他节点知道? 为了解决这两个问题,zk引入了watcher机制来实现发布/订阅功能,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态发生变化时,会通知所有订阅者。

watcher基本原理

zk实现watcher需要三个部分:分别是zk服务端,zk客户端和客户端的watchManager。如图所示,客户端向zk注册watcher的同时,会将客户端的watcher对象存储在客户端的watchManager中。zk服务器触发watch事件后,会向客户端发送通知,客户端线程从watchManager中取出对应watcher执行。

客户端如何实现事件通知的动作

客户端只需定义一个类实现org.apache.zookeeper.Watcher接口并实现接口的如下方法:

abstract public void process(WatchedEvent event);
复制代码

即可在得到通知后执行相应的动作。参数org.apache.zookeeper.WatchedEvent是zk服务端传来的事件,有三个成员:

final private KeeperState keeperState; // 通知状态
final private EventType eventType; //事件类型
private String path ;//那个节点发生的事件
复制代码

keeperState是个枚举对象,代表客户端和zk服务端的链接状态,定义如下:

/**
 * Enumeration of states the ZooKeeper may be at the event
 */
 public enum KeeperState {
      /** Unused, this state is never generated by the server */
      @Deprecated
      Unknown (-1),

      /** The client is in the disconnected state - it is not connected
       * to any server in the ensemble. */
       Disconnected (0),

      /** Unused, this state is never generated by the server */
       @Deprecated
       NoSyncConnected (1),

     /** The client is in the connected state - it is connected
      * to a server in the ensemble (one of the servers specified
      * in the host connection parameter during ZooKeeper client
      * creation). 
      * /
      SyncConnected (3),

      /**
       * Auth failed state
       */
       AuthFailed (4),

      /**
       * The client is connected to a read-only server, that is the
       * server which is not currently connected to the majority.
       * The only operations allowed after receiving this state is
       * read operations.
       * This state is generated for read-only clients only since
       * read/write clients aren't allowed to connect to r/o servers.
       */
       ConnectedReadOnly (5),

       /**
        * SaslAuthenticated: used to notify clients that they are SASL-authenticated,
        * so that they can perform Zookeeper actions with their SASL-authorized permissions.
        */
        SaslAuthenticated(6),

       /** The serving cluster has expired this session. The ZooKeeper
        * client connection (the session) is no longer valid. You must
        * create a new client connection (instantiate a new ZooKeeper
        * instance) if you with to access the ensemble. 
        */
        Expired (-112);

        private final int intValue;     // Integer representation of value
                                        // for sending over wire

        KeeperState(int intValue) {
            this.intValue = intValue;
        }

        public int getIntValue() {
            return intValue;
        }

        public static KeeperState fromInt(int intValue) {
              switch(intValue) {
                  case   -1: return KeeperState.Unknown;
                  case    0: return KeeperState.Disconnected;
                  case    1: return KeeperState.NoSyncConnected;
                  case    3: return KeeperState.SyncConnected;
                  case    4: return KeeperState.AuthFailed;
                  case    5: return KeeperState.ConnectedReadOnly;
                  case    6: return KeeperState.SaslAuthenticated;
                  case -112: return KeeperState.Expired;

                  default:
                        throw new RuntimeException("Invalid integer value for conversion to KeeperState");
               }
        }
 }
复制代码

eventType也是个枚举类型,代表节点发生的事件类型,比如创建新的子节点、改变节点数据等。定义如下:

/**
 * Enumeration of types of events that may occur on the ZooKeeper
 */
 public enum EventType {
       None (-1),
       NodeCreated (1),
       NodeDeleted (2),
       NodeDataChanged (3),
       NodeChildrenChanged (4),
       DataWatchRemoved (5),
       ChildWatchRemoved (6);

       private final int intValue;     // Integer representation of value
                                       // for sending over wire

       EventType(int intValue) {
            this.intValue = intValue;
       }

       public int getIntValue() {
            return intValue;
       }

       public static EventType fromInt(int intValue) {
            switch(intValue) {
                case -1: return EventType.None;
                case  1: return EventType.NodeCreated;
                case  2: return EventType.NodeDeleted;
                case  3: return EventType.NodeDataChanged;
                case  4: return EventType.NodeChildrenChanged;
                case  5: return EventType.DataWatchRemoved;
                case  6: return EventType.ChildWatchRemoved;

                default:
                         throw new RuntimeException("Invalid integer value for conversion to EventType");
            }
       }           
}
复制代码

keeperState和eventType对应关系如下所示:

对于NodeDataChanged事件:无论节点数据发生变化还是数据版本发生变化都会触发(即使被更新数据与新数据一样,数据版本都会发生变化)。 对于NodeChildrenChanged事件:新增和删除子节点会触发该事件类型。 需要注意的是:WatchedEvent只是事件相关的通知,并没有对应数据节点的原始数据内容和变更后的新数据内容,因此如果需要知道变更前的数据或变更后的新数据, 需要业务保存更新前的数据和调用接口获取新的数据。

如何注册watcher

watcher注册api

可以在创建zk客户端实例的时候注册watcher(构造方法中注册watcher)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,ZKClientConfig conf)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider,ZKClientConfig clientConfig)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, ZKClientConfig conf)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly, HostProvider aHostProvider)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
复制代码

zk的构造方法中传入的watcher将会作为整个zk会话期间的默认watcher,该watcher会一直保存为客户端ZKWatchManager的defaultWatcher成员,如果有其他的设置,这个watcher会被覆盖。 除了可以通过ZK的类的构造方法注册watcher之外,还可以通过zk类中其他一些api来注册watcher,只不过这些api注册的watcher就不是默认的watcher了。

public List<String> getChildren(final String path, Watcher watcher)
// boolean watch表示是否使用上下文中默认的watcher,即创建zk实例时设置的watcher
public List<String> getChildren(String path, boolean watch)
// boolean watch表示是否使用上下文中默认的watcher,即创建zk实例时设置的watcher
public byte[] getData(String path, boolean watch, Stat stat)
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
// boolean watch表示是否使用上下文中默认的watcher,即创建zk实例时设置的watcher
public Stat exists(String path, boolean watch)
public Stat exists(final String path, Watcher watcher)
复制代码

watcher注册示例代码

本示例中使用zk自带客户端演示watcher的使用,zk自带客户端有一点需要注意:Watcher设置后,一旦触发一次就会失效,如果需要一直监听,则需要再注册。 定义默认watcher:

/**
 * 测试默认watcher
 */
public class DefaultWatcher implements Watcher {

    @Override
    public void process(WatchedEvent event) {

        System.out.println("==========DefaultWatcher start==============");

        System.out.println("DefaultWatcher state: " + event.getState().name());

        System.out.println("DefaultWatcher type: " + event.getType().name());

        System.out.println("DefaultWatcher path: " + event.getPath());

        System.out.println("==========DefaultWatcher end==============");
    }
}
复制代码

定义监听子节点变化的watcher:

/**
 * 用于监听子节点变化的watcher
 */
public class ChildrenWatcher implements Watcher {

    @Override
    public void process(WatchedEvent event) {

        System.out.println("==========ChildrenWatcher start==============");

        System.out.println("ChildrenWatcher state: " + event.getState().name());

        System.out.println("ChildrenWatcher type: " + event.getType().name());

        System.out.println("ChildrenWatcher path: " + event.getPath());

        System.out.println("==========ChildrenWatcher end==============");
    }
}
复制代码

定义监听节点变化的watcher:

public class DataWatcher implements Watcher {

    @Override
    public void process(WatchedEvent event) {

        System.out.println("==========DataWatcher start==============");

        System.out.println("DataWatcher state: " + event.getState().name());

        System.out.println("DataWatcher type: " + event.getType().name());

        System.out.println("DataWatcher path: " + event.getPath());

        System.out.println("==========DataWatcher end==============");
    }
}
复制代码

watcher测试代码:

public class WatcherTest {

    /**
     * 链接zk服务端的地址
     */
    private static final String CONNECT_STRING = "192.168.0.113:2181";

    public static void main(String[] args) {

        // 除了默认watcher外其他watcher一旦触发就会失效,需要充新注册,本示例中因为
        // 还未想到比较好的重新注册watcher方式(考虑到如果在Watcher中持有一个zk客户端的
        // 实例可能存在循环引用的问题),因此暂不实现watcher失效后重新注册watcher的问题,
        // 后续可以查阅curator重新注册watcher的实现方法。

        // 默认watcher
        DefaultWatcher defaultWatcher = new DefaultWatcher();
        // 监听子节点变化的watcher
        ChildrenWatcher childrenWatcher = new ChildrenWatcher();
        // 监听节点数据变化的watcher
        DataWatcher dataWatcher = new DataWatcher();
        try {
            // 创建zk客户端,并注册默认watcher
            ZooKeeper zooKeeper = new ZooKeeper(CONNECT_STRING, 100000, defaultWatcher);

            // 让默认watcher监听 /GetChildren 节点的子节点变化
            // zooKeeper.getChildren("/GetChildren", true);

            // 让childrenWatcher监听 /GetChildren 节点的子节点变化(默认watcher不再监听该节点子节点变化)
            zooKeeper.getChildren("/GetChildren", childrenWatcher);

            // 让dataWatcher监听 /GetChildren 节点本省的变化(默认watcher不再监听该节点变化)
            zooKeeper.getData("/GetChildren", dataWatcher, null);

            TimeUnit.SECONDS.sleep(1000000);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}
复制代码

测试过程

1、首先在命令行客户端创建节点 /GetChildren

[zk: localhost:2181(CONNECTED) 133] create /GetChildren GetChildrenData
Created /GetChildren
复制代码

运行测试代码WatcherTest,输出以下内容:

==========DefaultWatcher start==============
DefaultWatcher state: SyncConnected
DefaultWatcher type: None
DefaultWatcher path: null
==========DefaultWatcher end==============
复制代码

可以看出在客户端第一次链接zk服务端时触发了链接成功的事件通知,该事件由默认watcher接收,导致默认watcher相关代码得到执行。 2、接着在命令行客户端创建子节点

[zk: localhost:2181(CONNECTED) 134] create /GetChildren/ChildNode ChildNodeData
Created /GetChildren/ChildNode
复制代码

ChildrenWatcher收到通知,/GetChildren的子节点发生变化,因此输出以下内容:

==========ChildrenWatcher start==============
ChildrenWatcher state: SyncConnected
ChildrenWatcher type: NodeChildrenChanged
ChildrenWatcher path: /GetChildren
==========ChildrenWatcher end==============
复制代码

3、最后在客户端修改/GetChildren节点数据

[zk: localhost:2181(CONNECTED) 135] set /GetChildren GetChildrenDataV2
cZxid = 0xab
ctime = Sat Sep 15 03:52:48 PDT 2018
mZxid = 0xb0
mtime = Sat Sep 15 04:06:05 PDT 2018
pZxid = 0xaf
cversion = 1
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 17
numChildren = 1
复制代码

DataWatcher收到通知,输出以下内容:

==========DataWatcher start==============
DataWatcher state: SyncConnected
DataWatcher type: NodeDataChanged
DataWatcher path: /GetChildren
==========DataWatcher end==============
复制代码

我们可以接着在客户端修改/GetChildren节点数据,但是没有任何输出了,说明DataWatcher已经失效了,需要重新注册才能使用。

分布式系统的理解

分布式系统集群的特点

1、集群中所有节点维护的数据要一致 2、所有节点都可以提供相同的业务功能(不一定是在同一时刻提供) 3、集群需要保障系统的高可用,某个节点宕机不会影响服务。

集群环境下如何保障数据一致性

集群环境下有三种方式保障数据一致性:数据复制、WNR和集中存储。

1、数据复制:先向单节点写入,再复制到其他节点,zk解释这样实现的。或者多节点同时写入,但只适合多节点写入的数据不是相同数据的应用场景。在master-slave场景中, 同步复制(slave从master全部复制完成才给客户端返回写入成功)可保证强一致性,但会影响可用性;异步复制(数据写入master就返回写入成功,不需要等到slave复制完成, 之后master通过push向slave推送数据或者slave通过ull方式从master拉数据)可提供可用性但会降低一致性。

2、WNR:N代表总副本数,W代表每次写操作要保证的最少写成功的副本数,R代表每次读操作最少读取的副本数,当W+R>N时,可保证每次读取的数据至少有一个副本具有最新的更新 (例如可以通过版本号或者时间戳判断是哪个副本的数据是最新的),多个写操作的顺序难以保证,可能导致多副本的写操作顺序不一致,Dynamo通过箱量时钟在保证最终一致性。

3、集中存储:借助可靠性较高的集中存储,比如NAS存储,分布式缓存(Redis)等。

分布式系统之sharding

对于业务系统来说,就是业务拆分,不同的子模块单独是一个的集群,整体业务系统是个大分布式系统。对于数据系统来说,就是数据的拆分。

zk集群

1、zk集群是一种对等集群,所有节点(机器)数据都一样。 2、集群节点之间靠心跳感知彼此的存在。 3、所有写操作都在主节点,其他节点只能读,虽然也可以接收写请求,但是内部会把写操作转给主节点。 4、通过选举机制选出主节点,从而保障了主节点的高可用,这样主节点就不是固定的,万一主节点宕机还可以重新选举出主节点。 5、至少需要三个节点,而且节点个数必须是奇数。 6、当一半以上的数据写入成功后,则返回写入成功,是最终一致性策略。 如下图所示的zk集群架构

client发送到follower的写请求转发给leader,由leader执行完写操作后再同步到follower。

分布式系统值CAP理论

CAP的定义

C是一致性(Consistency),A是可用性(Availability)、P是分区容忍性(Partition tolerance)。 CAP的定义:一个分布式系统不可能同时满足一致性、可用性和分区容忍性这三个要求,只能满足其中两项。 1、一致性:分布式环境下,一致性主要指数据在多个副本之间是否一致,即通过某个节点的写操作结果对后面通过其他节点的读操作可见,如果数据更新后并发访问情况下可立即感知其更新, 称为强一致性,如果允许之后部分或者全部感知不到该更新,称为弱一致性,若在之后一段时间后,一定可以感知该更新,称为最终一致性。

2、可用性:是指系统提供的服务必须一致处于可用状态,对用户的请求总是能够在有限的时间内返回结果,有限的时间强调用户能够接受的时间,任何一个没有发生故障的节点必须在有限的 时间内返回合理的结果。

3、分区容忍性:集群出现网络割裂时,集群还能继续提供一定的可用性和一致性,除非整个网络不可用,也即部分节点宕机或者无法与其他节点通信时,各分区间还可保持分布式系统的功能。

只能满足两项不是说另外一项就完全没有,而是要求没有那么严格。分区容忍性是分布式系统必须有的特性,因为网络不可靠,只能在C和A之间权衡。

放弃CAP定理

1、放弃分区容忍性(P):即集群如果出现网络割裂的话,整个集群无法提供可用性和一致性服务,这种情况要么集群节点无状态,要么把所有数据放在一个节点上,这样就失去了扩展性, 也不能叫分布式系统了,所以,单及应用因为放弃了P,得到很好的CA。

2、放弃可用性(A):放弃可用性,并不是完全没有可用性,是指允许响应超时的时间可以更长,比如报表可以运行10分钟左右,甚至在某些情况下允许超时。

3、放弃一致性(C):放弃一致性是放弃数据的强一致性,而保留数据的最终一致性,即数据最终是完全一致的,但有一个时间窗口的问题,这需要根据不同的业务来定义。

分布式系统之base理论

BASE是Basically Available(基本可用)、Soft State(软状态)和Eventually Consistent(最终一致性)三个短语的简称,它是随CAP中的一致性和可用性权衡的结果。 BASE的核心思想是即使无法做到强一致性,但是每个应用可以根据自身的业务特点,才去适当的方式达到最终一致性,同时获取到系统可用性。

BASE之基本可用体现在两个方面: 1、响应时间上的损失:比如某些请求1秒内给出响应,有些请求可能会在5秒内给出响应 2、功能上的损失:例如对于电商系统来说,某些区域可能不能购买某些商品,又或者大促时,部分消费者被引流到降级页面。

BASE之弱状态: 也称为软状态,是指允许系统中的数据存在中间状态,并认为该状态不会影响系统的整体可用性,即允许系统在不同节点的数据副本之间存在一定的延时。

BASE之最终一致性: 系统中的数据副本在经过一段时间同步后,组中能够达到一个一致的状态。

ZK自带客户端原生API

创建zk会话

org.apache.zookeeper.ZooKeeper类的构造方法用于创建zk客户端与服务端的会话。该类提供了如下几个构造方法

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

复制代码

构造方法参数说明:
1、connectString:指zk的服务器列表,以英文输入法逗号分割的host:port,比如192.168.1.1:2181,192.168.1.2:2181,也可以通过在后面跟着根目录,表示此客户端的操作 都是在此根目录下。
2、sessionTimeout:会话超时时间,单位毫秒,当在这个时间内没有收到心跳检测,会话就会失效。
3、watcher:注册的watcher,null表示不设置
4、canBeReadOnly:用于标识当前会话是否支持“Read-Only”模式:是指当zk集群中的某台机器与集群中过半以上的机器网络端口不通,则此机器将不会接收客户端的任何读写请求,但是有时,我们希望技能提供读请求,因此设置此参数为true,即客户端还能从与集群中半数以上节点网络不通的机器节点中读取数据。
5、sessionId和sessionPasswd:分别代表会话ID和会话秘钥,这两个参数一起可以唯一确定一个会话,客户端通过这两个参数可以实现客户端会话复用。

创建zk节点

org.apache.aookeeper.ZooKeeper类提供了如下创建zk节点的api:

public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)

复制代码

第一个方法以同步的方式创建节点,第二个方法以异步的方式创建节点。需要注意不论同步还是异步都不支持递归创建节点,当节点已经存在时,会抛出NodeExistsException。 create方法参数说明: 1、path:被创建的节点路径,比如:/zk-book/fock 2、data[]:节点中的数据,是一个字节数组 3、acl:ACL策略 4、createMode:节点类型,枚举类型,有四种选择:持久(PERSISTENT)、持久顺序(PERSISTENT_SEQUENTIAL)、临时(EPHEMENRAL)和临时顺序(EPHEMERAL_SEQUENTIAL)。 5、cb:异步回调函数,需要实现StringCallback接口,当服务器端创建完成后,客户端会自动调用这个对象的processResult。 6、ctx:用于传递一个对象,可以在回调方法执行的时候使用,通常用于传递业务的上下文信息。

删除zk节点

// 以同步的方式删除节点
public void delete(final String path, int version)
        throws InterruptedException, KeeperException
// 以异步的方式删除节点,如果写测试代码,客户端主线程不能退出,否则可能请求没有发到服物器或者异步回调不成功
public void delete(final String path, int version, VoidCallback cb, Object ctx)
复制代码

参数说明:
1、path:被删除节点的路径
2、version:节点的数据版本,如果指定的版本不是最新版本将会报错
3、cb:异步回调函数
4、ctx:传递的上下文信息,即操作之前的信息传递到删除之后的异步回调函数里面。

获取zk子节点

public List<String> getChildren(final String path, Watcher watcher) throws KeeperException, InterruptedException
public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException
public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)
public void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
public List<String> getChildren(final String path, Watcher watcher, Stat stat)
public List<String> getChildren(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException
public void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx)
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)

复制代码

参数说明:
1、path:数据节点路径,比如/zk-book/foo,获取改路径下的子节点列表
2、watcher:给节点设置的watcher,如果path对应节点的子节点数量发生变化,将会得到通知
3、watch:是否使用默认的watcher
4、stat:指定数据节点的状态信息
5、cb:异步回调函数
6、ctx:用于传递一个对象,可以在回调方法执行的时候使用,通常用于传递业务的上下文信息

获取zk节点数据

public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException
public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
public void getData(String path, boolean watch, DataCallback cb, Object ctx)

复制代码

修改zk节点数据

public Stat setData(final String path, byte data[], int version) throws KeeperException, InterruptedException
public void setData(final String path, byte data[], int version, StatCallback cb, Object ctx)

复制代码

检查zk节点是否存在

public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException
public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)
public void exists(String path, boolean watch, StatCallback cb, Object ctx)
复制代码

zk API使用示例

package com.ctrip.flight.test.zookeeper;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;

import java.util.ArrayList;
import java.util.List;

public class ZkClientTest {

    private static final String PARENT_PATH = "/zkClientTest";

    private static final String CHILD_PATH = "/zkClientTest/childNodeTest";

    private static final String IDENTITY = "zhangsan:123456";

    public static void main(String[] args) {
        try {
            DefaultWatcher defaultWatcher = new DefaultWatcher();

            ChildrenWatcher childrenWatcher = new ChildrenWatcher();

            ParentWatcher parentWatcher = new ParentWatcher();

            // 创建会话
            ZooKeeper client = new ZooKeeper("192.168.0.102:2181,192.168.0.102:2182,192.168.0.102:2183", 30000, defaultWatcher);

            client.addAuthInfo("digest", IDENTITY.getBytes());

            Stat stat = client.exists(PARENT_PATH, false);
            if (null != stat) {
                client.delete(PARENT_PATH, -1);
            }

            // 创建节点,临时节点不能有子节点,所以父节点是持久节点
            client.create(PARENT_PATH, "zkClientTestData_v1".getBytes(), getAcl(), CreateMode.PERSISTENT);

            // 创建子节点
            client.create(CHILD_PATH, "childNodeData_v1".getBytes(), getAcl(), CreateMode.EPHEMERAL);

            // 获取子节点信息
            Stat childStat = new Stat();
            List<String> childs = client.getChildren(PARENT_PATH, childrenWatcher, childStat);
            System.out.println(PARENT_PATH + "'s childs:" + childs);
            System.out.println(PARENT_PATH + "'s stat:" + childStat);

            Thread.sleep(1000);

            // 获取父节点数据
            Stat parentStat = new Stat();
            byte[] parentData = client.getData(PARENT_PATH, parentWatcher, parentStat);
            System.out.println(PARENT_PATH + "'s data: " + new String(parentData));
            System.out.println(PARENT_PATH + "'s stat: " + parentStat);

            Thread.sleep(1000);

            // 设置子节点数据
            childStat = client.setData(CHILD_PATH, "childNodeData_v2".getBytes(), -1);
            System.out.println(CHILD_PATH + "'s stat:" + childStat);
            byte[] childData = client.getData(CHILD_PATH, false, childStat);
            System.out.println(CHILD_PATH + "'s data:" + new String(childData));

            Thread.sleep(1000);

            // 删除子节点
            client.delete(CHILD_PATH, -1);
            // 判断子节点是否存在
            childStat = client.exists(CHILD_PATH, false);
            System.out.println(CHILD_PATH + " is exist: " + (childStat != null));

            client.delete(PARENT_PATH, -1);

            client.close();

            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * ACL格式为:schema:id:permission
     * @return
     */
    private static List<ACL> getAcl() throws Exception {
        List<ACL> acls = new ArrayList<>();

        // 指定schema
        String scheme = "auth";
        // 指定id
        // String identity = "zhangsan:123456";
        Id id = new Id(scheme, DigestAuthenticationProvider.generateDigest(IDENTITY));

        // Perms.ALL的权限为crdwa
        ACL acl = new ACL(ZooDefs.Perms.ALL, id);

        acls.add(acl);

        return acls;
    }
}

复制代码

zk开源客户端curator

zk原生api的不足

1、连接的创建是异步的,需要开发人员自行编码实现等待 2、连接没有自动的超时重连机制 3、zk本身不提供序列化机制,需要开发人员自行指定,从而实现数据的序列化和反序列化 4、Watcher注册一次只会生效一次,需要不断的重复注册。 5、Watcher本身的使用方式不符合java本身的术语,如果采用监听器的方式,更容易理解。 6、不支持递归创建树形节点。

zk第三方开源客户端

zk的第三方开源客户端主要有zkClient和Curator。其中zkClient解决了session会话超时重连、Watcher反复注册等问题,提供弄了更加简洁的api,但zkClient社区不活跃、 文档不够完善。而Curator是Apache基金会的顶级项目之一,它解决了session会话超时重连、watcher反复注册、NodeExistsException异常等问题,Curator具有更加完善的、 文档,因此我们这里只学习Curator的使用。

curator提供了一种类似jdk8中stream一样的流式操作。

创建zk会话

Curator中org.apache.curator.framework.CuratorFrameworkFactory类提供了如下两个创建zk会话的方法:

public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)

复制代码

1、connectString:逗号分开的ip:port对。 2、sessionTimeoutMs:会话超时时间,单位毫秒,默认60000ms,指链接建立完后多久没有收到心跳检测,超过该时间即为会话超时。 3、connectTImeoutMs:链接创建超时时间,单位毫秒,默认15000ms,指客户端与服务端建立连接时多长时间没连接上就算超时。 4、retryPolicy:重试策略,retryPolicy的类型定义如下:

   /**
    * Abstracts the policy to use when retrying connections
    */
    public interface RetryPolicy
    {
             /**
             * Called when an operation has failed for some reason. This method should return
             * true to make another attempt.
             *
               *
              * @param retryCount the number of times retried so far (0 the first time),第几次重试
              * @param elapsedTimeMs the elapsed time in ms since the operation was attempted,到当前重试时刻总的重试时间
              * @param sleeper use this to sleep - DO NOT call Thread.sleep,重试策略
              * @return true/false
              */
              public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);
    }

复制代码

allowRetry返回true继续重试,返回false不再重试。可以通过实现该接口自定义策略,curator已经为我们提供了若干重试策略 1、ExponentialBackoffRetry:该重试策略随着重试次数的增加,sleep的时间呈指数增长。

  public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
  public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

复制代码

第retryCount次重试的sleep时间计算方式为:baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1))),如果该值大于maxSleepMs,则sleep时间为maxSleepMs,如果重试次数大于maxRetries,则不再重试;

2、RetryNTimes:该重试策略指定重试次数,每次sleep固定时间,构造方法如下:

public RetryNTimes(int n, int sleepMsBetweenRetries)

复制代码

3、RetryOneTime:该重试策略只重试一次。 4、RetryUntilElapsed:该重试策略对重试次数不做限制,但对总的重试时间做限制。

public RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)

复制代码

除了newClient,CuratorFrameworkFactory还提供了一种建设者模式的方式来创建CuratorFramework对象。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 5);
CuratorFramework client =  CuratorFrameworkFactory.builder()
                .connectString("192.168.0.102:2181,192.168.0.102:2182,192.168.0.102:2183")
                .sessionTimeoutMs(30000).connectionTimeoutMs(15000)
                .retryPolicy(retryPolicy)
                .namespace("curatorTest")
                .build();

复制代码

创建zk节点

在curator中无论执行何种操作都必须先获得一个构建该操作的包装类(Builder对象),创建zk节点需要先获得一个org.apache.curator.framework.api.CreateBuilder (实际上是CreateBuilder的实现类CreateBuilderImpl)对象,然后用这个对象来创建节点。CreateBuilderImpl中常见的操作如下:

// 递归创建(持久)父目录
public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentsIfNeeded()
// 设置创建节点的属性
public ACLBackgroundPathAndBytesable<String> withMode(CreateMode mode)
// 设置节点的acl属性
public ACLBackgroundPathAndBytesable<String> withACL(List<ACL> aclList, boolean applyToParents)
// 指定创建节点的路径和节点上的数据
public String forPath(final String givenPath, byte[] data) throws Exception

复制代码

如下所示为创建一个节点的示例:

String test1Data = client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath("/curatorTest/test1", "test1".getBytes());

复制代码

删除zk节点

同理先调用CuratorFramework的delete()获取构建删除操作的DeleteBuilder(实际上为DeleteBuilderImpl),DeleteBuilderImpl提供了如下方法:

// 指定要删除数据的版本号
public BackgroundPathable<Void> withVersion(int version)
// 确保数据被删除,本质上就是重试,当删除失败时重新发起删除操作
public ChildrenDeletable guaranteed()
// 指定删除的节点
public Void forPath(String path) throws Exception
// 递归删除子节点
public BackgroundVersionable deletingChildrenIfNeeded()

复制代码

读取zk节点数据

同理先调用CuratorFramework的getData()获取构建获取数据操作的GetDataBuilder(实际上为GetDataBuilderImpl),GetDataBuilderImpl提供了如下方法:

// 将节点状态信息保存到stat
public WatchPathable<byte[]> storingStatIn(Stat stat)
// 指定节点路径
public byte[] forPath(String path) throws Exception

复制代码

如下示例为获取节点数据:

Stat test1Stat = new Stat();
byte[] test1DataBytes = client.getData().storingStatIn(test1Stat).forPath("/curatorTest/test1");
System.out.println("test1 data: " + new String(test1DataBytes));

复制代码

更新zk节点数据

同理先调用CuratorFramework的setData()先获取构建更新数据操作的setDataBuilder(实际上是setDataBuilderImpl),setDataBuilderImpl提供了如下方法:

// 指定版本号
public BackgroundPathAndBytesable<Stat> withVersion(int version)
// 指定节点路径和要更新的数据
public Stat forPath(String path, byte[] data) throws Exception

复制代码

读取zk子节点

同理先调用CuratorFramework的getChildren()获取构建获取子节点数据操作的GetChildrenBuilder(实际上为GetChildrenBuilderImpl),GetChildrenBuilderImpl 提供了如下方法:

// 把服务器端获取到的状态数据存储到stat对象中
public WatchPathable<List<String>> storingStatIn(Stat stat)
// 指定获取子节点数据的节点路径
public List<String> forPath(String path) throws Exception
// 设置watcher,类似于zookeeper本身的api,也只能使用一次
public BackgroundPathable<List<String>> usingWatcher(Watcher watcher)
public BackgroundPathable<List<String>> usingWatcher(CuratorWatcher watcher)

复制代码

示例代码:

Stat childStat = new Stat();
List<String> childs = client.getChildren().storingStatIn(childStat).forPath("/curatorTest");
复制代码

curator的异步操作

Curator为所有操作都提供了异步执行的版本,只要在构建操作的方法链中添加如下方法之一即可。

public ErrorListenerPathable<List<String>> inBackground()
public ErrorListenerPathable<List<String>> inBackground(Object context)
public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback)
public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Object context)
public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Executor executor)
public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Object context, Executor executor)
复制代码

如下示例代码为使用异步删除操作:

client.delete()
          .guaranteed()
          .withVersion(-1)
          .inBackground(((client1, event) -> {
                    System.out.println(event.getPath() + ", data=" + event.getData());
                    System.out.println("event type=" + event.getType());
                    System.out.println("event code=" + event.getResultCode());
           }))
           .forPath("/curatorTest/test1");
复制代码

curator的NodeCache

NodeCache会将某一路径的节点(节点本身)在本地缓存一份,当zk中相应路径的节点发生更新、创建或者删除操作时,NodeCache会得到响应,并且会将最新的数据拉倒本地缓存中, NodeCache只会监听路径本身的变化,并不会监听子节点的变化。我们可以通过NodeCache注册一个监听器来获取发生变化的通知。NodeCache提供如下构造函数:

public NodeCache(CuratorFramework client, String path)
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)

复制代码

1、client:curator客户端 2、path:需要缓存的节点路径 3、dataIsCompressed:是否压缩节点下的数据 NodeCache提供了一个如下类型的监听器容器,只要向容器中添加监听器,当节点发生变更时,容器中的监听器都会得到通知:

private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();

复制代码

NodeCache缓存数据及添加Listener的示例代码如下:

NodeCache nodeCache = new NodeCache(client, "/curatorTest/test1");
// 是否立即拉取/curatorTest/test1节点下的数据缓存到本地
nodeCache.start(true);
// 添加listener
nodeCache.getListenable().addListener(() -> {
        ChildData childData = nodeCache.getCurrentData();
        if (null != childData) {
             System.out.println("path=" + childData.getPath() + ", data=" + childData.getData() + ";");
        }
});

复制代码

curator的PathChildrenCache

PathChildrenCache会将指定路径节点下的所有子节点缓存到本地,但不会缓存节点本身的信息,当执行新增(CHILD_ADDED)、删除(CHILD_REMOVED)、更新(CHILD_UPDATED) 指定节点下的子节点等操作时,PathChildCache中的Listener会得到通知。

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory)
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)

复制代码

PathChildrenCache通过start方法可以传入三种启动模式,这三种启动模式定义在org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode中。 1、NORMAL:异步初始化cache 2、BUILD_INITIAL_CACHE:同步初始化cache,以及创建cache后,就从服务器拉取对应的数据。 3、POST_INITIALIZED_EVENT:异步初始化cache,初始化完成触发PathChildrenCacheEvent.Type#INITIALIZED事件,cache中Listener会收到该事件的通知。

PathChildrenCache示例代码如下:

PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/curatorTest", true);
// startMode为BUILD_INITIAL_CACHE,cache是初始化完成会发送INITIALIZED事件
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
System.out.println(pathChildrenCache.getCurrentData().size());
pathChildrenCache.getListenable().addListener(((client1, event) -> {
            ChildData data = event.getData();
            switch (event.getType()) {
                case INITIALIZED:
                    System.out.println("子节点cache初始化完成(StartMode为POST_INITIALIZED_EVENT的情况)");
                    System.out.println("INITIALIZED: " + pathChildrenCache.getCurrentData().size());
                    break;
                case CHILD_ADDED:
                    System.out.println("添加子节点,path=" + data.getPath() + ", data=" + new String(data.getData()));
                    break;
                case CHILD_UPDATED:
                    System.out.println("更新子节点,path=" + data.getPath() + ", data=" + new String(data.getData()));
                    break;
                case CHILD_REMOVED:
                    System.out.println("删除子节点,path=" + data.getPath());
                    break;
                default:
                    System.out.println(event.getType());
            }
}));

复制代码

ZK选举及数据一致性

ZAB协议

ZAB(Zookeeper Atomic Broadcast)协议,即zk原子消息广播协议,协议内容大致如下:所有事务的请求必须由全局唯一的服务器来协调处理,这样的服务器被称为Leader服务器, 而余下的服务器则称为Follower服务器,Leader服务器负责将一个客户端的请求转换成一个事物Proposal(提议),并将该Proposal分发给集群中所有Follower服务器, 之后Leader服务器需要等待所有Follower服务器的反馈,一旦超过半数的Foll服务器进行了正确的反馈后,那么Leader就会再次向所有的Foll服务器分发Commit消息, 要求其将前一个Proposal提交。

zk集群中Leader的选举依赖此协议,数据的写入过程也需要依赖此协议,ZAB的核心是定义了那些会改变zk服务器数据状态的事务请求的处理方式。

ZK集群中节点角色

zk集群中节点有三种角色:
1、Leader:事务请求的唯一调度和处理者,保证集群事物处理的顺序性,同时也是集群内部各服务器的调度者。
2、Follower:处理客户端的非事务请求,转发事务请求给Leader服务器,参与事务请求Proposal的投票,参与Leader的选举投票。
3、Observer:处理客户端的非事务请求,转发事物请求给Leader服务器,不参与任何形式的投票,此角色存在通常是为了提高读性能。

ZK集群中节点的状态

ZK集群中节点存在如下几种状态:
1、LOOKING:寻找Leader的状态,当服务器处于此状态时,表示当前没有Leader,需要进入选举流程;
2、FOLLOWING:跟随者状态,表明当前服务器角色是Follower。
3、OBSERVER:观察者状态,表明当前服务器是Observer。
4、LEADING:领导者状态,表明当前服务器角色是Leader。

ZK集群中节点间的通信

ZK集群中的节点基于TCP协议进行通信,为避免重复创建两个节点之间的TCP连接,ZK按照myid数值方向来建立连接,即myid小的节点向myid大的节点发起连接,例如myid为1的节点向 myid为2的节点发起连接。节点之间配置两个通信端口,例如配置项server.1=localhost:2888:3888中第一个端口2888是通信和数据同步的端口,第二个端口3888是投票端口。

ZK中Leader选举算法

从3.4.0版本后zk只支持基于TCP协议的org.apache.zookeeper.server.quorum.FastLeaderElection选举算法。

Zk中Leader选举的触发时机

在以下两种情况下会触发Leader的选举:
1、集群启动:集群刚启动的时候节点处于LOOKING的状态,没有Leader,需要进入选举流程。
2、奔溃恢复:例如Leader宕机,或者因为网络原因导致过半节点与Leader心跳中断。

影响节点成为Leader的因素

ZK通过三个因素来判断一个节点能否成为Leader:
1、数据的新旧程度:只有最新的数据节点才有机会成为Leader,在zk中通过事务id(zxid)的大小来表示数据的新旧,越大代表数据越新。
2、myid:集群在启动的时候,会在数据目录下配置myid文件,里面的数字代表当前zk节点的编号,当zk节点数据一样新时,myid中数字越大的就会被选举成为Leader,当集群中已经 有Leader时,新加入得节点不会影响原来的集群。
3、投票数量:只有得到集群中多半的投票,才能成为Leader,多半即(n/2+1),n为集群中节点数量。

Leader的选举过程

为简单描述,我们以三个节点的zk集群为例,三个节点对应的myid为1,2,3.分析初次启动时Leader的选举和运行过程中Leader宕机后新Leader的选举。

初次启动时Leader的选举

初次启动时三个ZK节点都没有数据,Leader选举过程如下:
1、第一步:启动myid为1的节点,此时zxid为0,没法选出Leader节点
2、第二步:启动myid为2的节点,它的zxid也为0,但它的myid为2,更大一些,因此第二个节点成为Leader节点。
3、第三步:启动myid为3的节点,因为已经有了Leader节点,3加入集群后2还是Leader节点。

运行过程中Leader宕机后新Leader的选举

假设server2为主节点,并且server2宕机,剩下server1和server3进行Leader的选举。选举流程如下:
1、变更状态:Leader宕机后,其他节点的状态变为LOOKING。
2、生成投票信息:每个server发出一个投自己票的投票,假定生成的投票信息为(myid,zxid)的形式,server1的投票信息为(1,123),并将该投票信息发给server3, server3的投票信息为(3,122),并将该投票心思发给server1。
3、投票处理:server3收到server1的投票信息(1,123),发现该投票的zxid123比自己的122大,则server3修改自己的投票信息为(1,123),然后发给server1。
4、投票处理:server1收到server3的投票信息(3,122),发现server3的zxid122比自己的123小,则不改变自己的投票。
5、统计投票信息:server3统计收到的投票(包括自己投的),(1,123)是两票,server1统计收到的投票(包括自己投的),(1,123)是两票。
6、修改服务器状态:server3选出的Leader是1,而自己是3,因此自己进入FOLLOWING状态,即Follower角色。server1选出的Leader是1,自己就是1,因此进入LEADING状态, 即Leader角色。

ZK数据同步

当Leader完成选举后,Follower需要与新的Leader同步数据,在Leader端需要做如下工作:
1、Leader告诉其他Follower当前最新数据是什么即zxid,Leader会构建一个NEWLEADER包,包括当前最大的zxid,发送给所有的Follower或者Observer。
2、Leader给每个Follower创建一个县城LearnerHandler来负责处理每个Follower的数据同步请求,同时主线程开始阻塞,只有超过一半的Follower同步完成, 同步过程才完成,Leader才能成为真正的Leader。
3、Leader端根据同步算法进行同步操作。

而在Follower端会做以下工作:
1、选举完成后,尝试与Leader建立同步连接,如果一段时间没有连接上就报错超时,重新回到选举状态。
2、向Leader发送FOLLOWERINFO封包,带上自己最大的zxid
3、根据同步算法进行同步操作。

具体使用哪种同步算法取决于Follower当前最大的zxid,在Leader端会维护最小事务id:minCommittedLog和最大事务id:maxCommittedLog两个zxid, minCommittedLog是没有被快照存储的日志文件的第一条(每次快照存储完,会重新生成一个事务日志文件),maxCommittedLog是事务日志中最大的日志。 zk中实现了以下数据同步算法:
1、直接差异化同步(DIFF同步)
2、仅回滚同步(TRUNC),即删除多余的事务日志,比如原来的Leader节点宕机后又重新加入,可能存在它自己写入并提交但是别的节点还没来得及提交的数据。
3、先回滚(TRUNC)再差异化(DIFF)同步。
4、全量同步(SNAP)。

ZK广播流程

当zk集群选举完成,并且数据同步结束后即可开始对外提供服务,接收读写请求,当Leader接收客户端新的事务请求后,会向集群的Follower广播该事务请求,广播流程如下:
1、Leader首先会根据客户端的事务请求生成对应的事务修改提议,并根据zxid的顺序(收到多个客户端事务请求)向所有的Follower发送数据修改提议。
2、当Follower收到Leader的数据修改提议后,会根据接收的限售顺序处理这些提议,即如果收到了1,2,3三条数据修改提议,如果处理完成了第三条,则代表1,2条一定处理成功。
3、Leader收到Follower针对某个数据修改提议过半的正确反馈(ack)后,发起对该事务修改提议的提交,即重新发起一个事务提交的提议。
4、Follower收到事务提交的提议后,记录事务提交,并把数据更新到内存数据库。

公众号

关注公众号推送更多精彩文章