Zookeeper 教程 (三):ZooKeeper 源码阅读之 Worker 机制及集群状态监控

阅读 468
收藏 19
2017-06-01
原文链接:blog.csdn.net

前言:接下来将会着重解释一下ZooKeeper里面重要的代码,本次将针对Worker机制,以及运用Worker进行对集群状态进行监控。如果对Zookerper的安装配置和运用不了解的,可以看看上两篇文章:(1)Zookeeper教程(一):快速开始以及结合java实现分布式Barrier和Queue (2) Zookeeper教程(二):ZooKeeper与Dubbo结合以及原理讲解

(一)Worker原理和源代码解析
(1)原理:
客户端向服务端注册一个Watcher监听,服务端特定事件发生后,触发这个Watcher,接着向指定客户端发送一个事件通知。原理挺简单,那么是如何实现的呢?
(2)结合源码进行解析
Watcher机制主要包括了客户端,客户端WatchManager,ZooKeeper服务端三部分。客户端在服务端注册Watcher的时候,同时将Watcher对象存储在WatchManager中,当服务端触发了Watcher事件后,会向客户端发送通知,客户端此时通过取出相应的Watcher对象来执行。接下来先看一下WatchManager类中部分代码:

(3)WatchManager类

public class WatchManager {
    private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);

    private final HashMap<String, HashSet<Watcher>> watchTable =
        new HashMap<String, HashSet<Watcher>>();

    private final HashMap<Watcher, HashSet<String>> watch2Paths =
        new HashMap<Watcher, HashSet<String>>();

    public synchronized int size(){
        int result = 0;
        for(Set<Watcher> watches : watchTable.values()) {
            result += watches.size();
        }
        return result;
    }
    //同步方法,保证线程安全
    public synchronized void addWatch(String path, Watcher watcher) {
        //先取出所有的watch
        HashSet<Watcher> list = watchTable.get(path);
        if (list == null) {
            // don't waste memory if there are few watches on a node
            // rehash when the 4th entry is added, doubling size thereafter
            // seems like a good compromise
            list = new HashSet<Watcher>(4);
            watchTable.put(path, list);
        }
        list.add(watcher);

        HashSet<String> paths = watch2Paths.get(watcher);
        if (paths == null) {
            // cnxns typically have many watches, so use default cap here
            paths = new HashSet<String>();
            watch2Paths.put(watcher, paths);
        }
        paths.add(path);
    }
}

Watcher注册和通知图例

(4)Watcher接口(代码过长,选取重要部分)

public interface Watcher {
    public interface Event {
            .........
        }
    }
    abstract public void process(WatchedEvent event);
}

在Event里面包含了KeeperState和EventType两个枚举类,详细对应如下:
这里写图片描述

而process方法是一个回调方法(对回调不清楚的可以搜下资料或者在下面评论,我再解释一下),当ZooKeeper向客户端发送一个Watcher事件通知时,客户端就会调用对应的process进行回调。而precess的参数是一个WatchedEvent类型的参数,接下来看看这个类

(5)WatchedEvent类

public class WatchedEvent {
    final private KeeperState keeperState;
    final private EventType eventType;
    private String path;
    //省略构造函数和get方法
    /**
     *  Convert WatchedEvent to type that can be sent over network
     */
    public WatcherEvent getWrapper() {
        return new WatcherEvent(eventType.getIntValue(), 
                                keeperState.getIntValue(), 
                                path);
    }
}

从上面可以看出WatchedEvent包含三个属性:通知状态(KeeperState),事件类型(EventType)和节点路径(Path),ZooKeeper调用WatchedEvent对象来封装服务端事件并传递给Watcher,从而方便回调方法process对服务端事件进行处理。
我们注意到在getWrapper中返回了一个WatcherEvent对象,这个对象和WatchedEvent表示同一个事务,都是对服务端时间的封装。不同的是WatchedEvent是一个逻辑事件,用于服务端和客户端程序执行过程中所需的逻辑对象。而通过WatcherEvent的源码我们可以看到它实现了序列化接口,因此可以用于网络传输
于是整个流程就是:服务端调用getWrapper方法包装一个WatcherEvent事件,客户端在收到这个事件对象时候,首先将WatcherEvent还原成一个WatchedEvent事件,并传递给process处理,接着回调函数process根据这个事件就能解析出完整的服务端事件了。
通知客户端和存储Watcher看完了,接下来看看客户端注册Watcher流程

(6)客户端注册Watcher流程 首先看一行代码:

zk = new ZooKeeper(HostPort,2000,connectionWatcher);

在这行代码里面,第三个参数是一个Watcher,将作为整个ZooKeeper会话期间默认的Watcher,会一直保存在客户端ZKWatcherManager的defaultWatcher里面。ZKWatchManager是ZooKeeper的一个内部静态类,部分代码如下

private static class ZKWatchManager implements ClientWatchManager {
        //数据变更Watcher
        private final Map<String, Set<Watcher>> dataWatches =
            new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> existWatches =
            new HashMap<String, Set<Watcher>>();
        //子节点变更Watcher
        private final Map<String, Set<Watcher>> childWatches =
            new HashMap<String, Set<Watcher>>();

        private volatile Watcher defaultWatcher;

        final private void addTo(Set<Watcher> from, Set<Watcher> to) {
            if (from != null) {
                to.addAll(from);
            }
        }
 }

客户端的请求几乎在ClientCnxn里面进行操作,当收到请求后,客户端会对当前客户端请求进行标记,然后将其设置为使用Watcher监听,同时会封装一个Watcher的注册信息WatchRegistration对象,用于暂时保存数据节点的路径和Watcher的对应关系 在 ZooKeeper 中,Packet 是一个最小的通信协议单元,即数据包。Pakcet 用于进行客户端与服务端之间的网络传输,任何需要传输的对象都需要包装成一个 Packet 对象。在 ClientCnxn 中 WatchRegistration 也会被封装到 Pakcet 中,然后由 SendThread 线程调用 queuePacke 方法把 Packet 放入发送队列中等待客户端发送,这又是一个异步过程,分布式系统采用异步通信是一个普遍认同的观念。随后,SendThread 线程会通过 readResponse 方法接收来自服务端的响应,异步地调用 finishPacket 方法从 Packet 中取出对应的 Watcher 并注册到 ZKWatchManager 中去

private void finishPacket(Packet p) {
     if (p.watchRegistration != null) {
         p.watchRegistration.register(p.replyHeader.getErr());
     }

     if (p.cb == null) {
         synchronized (p) {
             p.finished = true;
             p.notifyAll();
         }
         } else {
             p.finished = true;
             eventThread.queuePacket(p);
     }
 }

我们可以通过getData,getChildren,exist三个接口向ZooKeeper服务端注册Watcher,下面挑getChildren出来讲一下

public List<String> getChildren(final String path, Watcher watcher,
            Stat stat)
        throws KeeperException, InterruptedException
    {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

        // the watch contains the un-chroot path
        WatchRegistration wcb = null;
        if (watcher != null) {
            wcb = new ChildWatchRegistration(watcher, clientPath);
        }

        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getChildren2);
        GetChildren2Request request = new GetChildren2Request();
        request.setPath(serverPath);
        request.setWatch(watcher != null);
        GetChildren2Response response = new GetChildren2Response();
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        if (stat != null) {
            DataTree.copyStat(response.getStat(), stat);
        }
        return response.getChildren();
    }

接下来看看register的代码

/**
         * Register the watcher with the set of watches on path.
         * @param rc the result code of the operation that attempted to
         * add the watch on the path.
         */
        public void register(int rc) {
            if (shouldAddWatch(rc)) {
                Map<String, Set<Watcher>> watches = getWatches(rc);
                synchronized(watches) {
                    Set<Watcher> watchers = watches.get(clientPath);
                    if (watchers == null) {
                        watchers = new HashSet<Watcher>();
                        watches.put(clientPath, watchers);
                    }
                    watchers.add(watcher);
                }
            }
        }

在这个方法里面,客户端将Watcher对象转交给ZKWatcherManager,并最终保存在一个Map类型的数据结构dataWatches里面,用于将数据结点的路径和Watcher对象进行一一映射后管理起来

(7)服务端处理watcher流程
序列图如下(看到的一个序列图,感觉很不错)
这里写图片描述

接下来将会精简地讲解主要的代码:
FinalRequestProcessor类接收到客户端请求后,会调用processRequest方法进行处理,会进一步转向 ZooKeeperServer 的 processRequest 进行进一步处理,处理结由 ZKDatabase 类返回。

processRequest代码

public void processRequest(Request request) {
if (request.hdr != null) {
 TxnHeader hdr = request.hdr;
 Record txn = request.txn;

 rc = zks.processTxn(hdr, txn);
 }

ZooKeeperServer 代码

public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
ProcessTxnResult rc;
 int opCode = hdr.getType();
 long sessionId = hdr.getClientId();
 rc = getZKDatabase().processTxn(hdr, txn);
 if (opCode == OpCode.createSession) {
 if (txn instanceof CreateSessionTxn) {
 CreateSessionTxn cst = (CreateSessionTxn) txn;
 sessionTracker.addSession(sessionId, cst
 .getTimeOut());
 } else {
 LOG.warn("*****>>>>> Got "
 + txn.getClass() + " "
 + txn.toString());
 }
 } else if (opCode == OpCode.closeSession) {
 sessionTracker.removeSession(sessionId);
 }
 return rc;
}

ZKDatabase 代码

public ProcessTxnResult processTxn(TxnHeader header, Record txn)
 {
switch (header.getType()) {
case OpCode.setData:
 SetDataTxn setDataTxn = (SetDataTxn) txn;
 rc.path = setDataTxn.getPath();
 rc.stat = setData(setDataTxn.getPath(), setDataTxn
 .getData(), setDataTxn.getVersion(), header
 .getZxid(), header.getTime());
 break;

对于注册 Watcher 请求,FinalRequestProcessor 的 ProcessRequest 方法会判断当前请求是否需要注册 Watcher,如果为 true,就会将当前的 ServerCnxn 对象和数据节点路径传入 getData 方法中去。ServerCnxn 是一个 ZooKeeper 客户端和服务器之间的连接接口,代表了一个客户端和服务器的连接,我们后面讲到的 process 回调方法,实际上也是从这里回调的,所以可以把 ServerCnxn 看作是一个 Watcher 对象。数据节点的节点路径和 ServerCnxn 最终会被存储在 WatchManager 的 watchTable 和 watch2Paths 中。

如前所述,WatchManager 负责 Watcher 事件的触发,它是一个统称,在服务端 DataTree 会托管两个 WatchManager,分别是 dataWatches 和childWatches,分别对应数据变更 Watcher 和子节点变更 Watcher。当发生 Create、Delete、NodeChange(数据变更)这样的事件后,DataTree 会调用相应方法去触发 WatchManager 的 triggerWatch 方法,该方法返回 ZNODE 的信息,自此进入到回调本地 process 的序列。

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
 KeeperState.SyncConnected, path);
//将事件类型(EventType)、通知状态(WatchedEvent)、节点路径封装成一个 WatchedEvent 对象
 HashSet<Watcher> watchers;
 synchronized (this) {
//根据数据节点的节点路径从 watchTable 里面取出对应的 Watcher。如果没有找到 Watcher 对象,
说明没有任何客户端在该数据节点上注册过 Watcher,直接退出。如果找打了 Watcher 就将其提取出来,
同时会直接从 watchTablewatch2Paths 里删除 Watcher,即 Watcher 是一次性的,触发一次就失效了。
 watchers = watchTable.remove(path);
for (Watcher w : watchers) {
 HashSet<String> paths = watch2Paths.get(w);
 }
 }
 for (Watcher w : watchers) {
 if (supress != null && supress.contains(w)) {
 continue;
 }
//对于需要注册 Watcher 的请求,ZooKeeper 会把请求对应的恶 ServerCnxn 作为一个 Watcher 存储,
所以这里调用的 process 方法实质上是 ServerCnxn 的对应方法
 w.process(e);
 }
 return watchers;
}

如果想要处理一个 Watcher,需要执行的步骤如下所示:
1. 将事件类型(EventType)、通知状态(WatchedEvent)、节点路径封装成一个 WatchedEvent 对象。
2. 根据数据节点的节点路径从 watchTable 里面取出对应的 Watcher。如果没有找到 Watcher 对象,说明没有任何客户端在该数据节点上注册过 Watcher,直接退出。如果找到了 Watcher 就将其提取出来,同时会直接从 watchTable 和 watch2Paths 里删除 Watcher,即 Watcher 是一次性的,触发一次就失效了。
3. 对于需要注册 Watcher 的请求,ZooKeeper 会把请求对应的 ServerCnxn 作为一个 Watcher 存储,所以这里调用的 process 方法实质上是 ServerCnxn 的对应方法,在请求头标记“-1”表示当前是一个通知,将 WatchedEvent 包装成 WatcherEvent 用于网络传输序列化,向客户端发送通知,真正的回调方法在客户端

(8)集群监控示例:

package com.shaoqing.zookeeper3;

import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class ClusterMonitor implements Runnable {
    private static String membershipRoot = "/Members";
    private final Watcher connectionWatcher;
    private final Watcher childrenWatcher;
    private ZooKeeper zk;
    boolean alive = true;

    public ClusterMonitor(String HostPort) throws IOException, InterruptedException, KeeperException {
        connectionWatcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // TODO Auto-generated method stub
                if (event.getType() == Watcher.Event.EventType.None
                        && event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                    System.out.println("\nconnectionWatcher Event Received:%s" + event.toString());
                }
            }
        };

        childrenWatcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // TODO Auto-generated method stub
                System.out.println("\nchildrenWatcher Event Received:%s" + event.toString());
                if (event.getType() == Event.EventType.NodeChildrenChanged) {
                    try {
                        // Get current list of child znode and reset the watch
                        List<String> children = zk.getChildren(membershipRoot, this);
                        System.out.println("Cluster Membership change,Members: " + children);
                    } catch (KeeperException ex) {
                        throw new RuntimeException(ex);
                    } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                        alive = false;
                        throw new RuntimeException(ex);
                    }
                }
            }
        };

        zk = new ZooKeeper(HostPort, 2000, connectionWatcher);
        // Ensure the parent znode exists
        if (zk.exists(membershipRoot, false) == null) {
            zk.create(membershipRoot, "ClusterMonitorRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        // Set a watch on the parent znode
        List<String> children = zk.getChildren(membershipRoot, childrenWatcher);
        System.err.println("Members:" + children);
    }

    public synchronized void close() {
        try {
            zk.close();
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            synchronized (this) {
                while (alive) {
                    wait();
                }
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            this.close();
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        if (args.length != 1) {
            System.err.println("Usage:ClusterMonitor<Host:Port>");
            System.exit(0);
        }
        String hostPort = args[0];
        new ClusterMonitor(hostPort).run();
    }

}
package com.shaoqing.zookeeper3;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class ClusterClient implements Watcher, Runnable {
    private static String membershipRoot = "/Members";
    ZooKeeper zk;

    public ClusterClient(String hostPort, Long pid) {
        String processId = pid.toString();
        try {
            zk = new ZooKeeper(hostPort, 2000, this);
        } catch (IOException ex) {
            ex.printStackTrace();
        }
        if (zk != null) {
            try {
                zk.create(membershipRoot + '/' + processId, processId.getBytes(), Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL);
            } catch (KeeperException | InterruptedException ex) {
                ex.printStackTrace();
            }
        }
    }

    public synchronized void close() {
        try {
            zk.close();
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {
        // TODO Auto-generated method stub
        System.out.println("\nEvent Received:%s" + event.toString());
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            synchronized (this) {
                while (true) {
                    wait();
                }
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            this.close();
        }
    }

    public static void main(String[] args) {
        if (args.length != 1) {
            System.err.println("Usage:ClusterClient<Host:Port>");
            System.exit(0);
        }

        String hostPort = args[0];
        // Get the process id
        String name = ManagementFactory.getRuntimeMXBean().getName();
        int index = name.indexOf('@');
        Long processId = Long.parseLong(name.substring(0, index));
        new ClusterClient(hostPort, processId).run();
    }

}

上面的示例我们演示了如何发起对于一个 ZNODE 的监听,当该 ZNODE 被改变后,我们会触发对应的方法进行处理,这类方式可以被用在数据监听、集群状态监听等用途。

Git代码地址
github.com/wacxt/zooke…

参考文献: 1,《从 Paxos 到 ZooKeeper》


本文对你有帮助?欢迎扫码加入后端学习小组微信群:

评论