ZooKeeper (三)源码剖析: 服务端网络连接层 NIO

1,299 阅读7分钟

前言

Debug 一下 ZooKeeper 的客户端和服务端的网络连接源码,主要是希望知道当客户端发一个请求过来时,ZooKeeper 都做了哪些操作。上两篇文章已经记述了调试环境的搭建和群首选举的,期望可以熟悉一下 ZooKeeper 的设计。

  1. ZooKeeper 群首选举 源码剖析
  2. Zookeeper 架设源码调试环境

从网络连接开始

在群首选举中,我们除了分析里面的选举算法,还特意看了一下算法下面的基础网络连接是怎么样的。这次也先从网络连接层开始,继续细节分析。为啥要重复分析网络连接层?原因有两点:

  1. 因为我自己在网络连接层的方面,知道的理论比较多,但实操比较少;
  2. 所有的服务程序,中间件都必须有网络连接层,多读读这些代码,理解他们对网络连接的解决方案;

JDK NIO 的服务启动

PS:阅读源码最好的办法还是从问题出发,而不是从main方法出发。我从main方法出发,是因为还是一个源码阅读的初学者,希望知悉一个路径的所有细节,也就是垂直阅读源码。等到明白各个组件main方法的大同小异(例如:网络组件等如何start),后面的源码就会直接选择一个问题来阅读了。——Note

NIOServerCnxnFactory 是默认的连接管理器,内部是使用 JDK NIO 来实现网络连接的。相比下,群首选举 是使用的 BIO 做长连接。下面简单看看连接器内部的分工,还有类的关键字段:

NIOServerCnxnFactory {

    WorkerService workerPool 
    // 一个工作者池,专门用于执行具体的IO操作的。这里注意区别于 SelectorThread

    AcceptThread {
        // 负责Channel建立的线程 类似 Netty Boss 线程
        
        private final ServerSocketChannel acceptSocket;  // NIO 服务端 Channel
        private final SelectionKey acceptKey;
        
        private final Collection<SelectorThread> selectorThreads;   // Workers 们
    }
    
    SelectorThread {
        // 负责处理事件的线程 负责轮询 SelectionKey,
        // 假如有对应事件,封装成IOWorkRequest,放进队列里,让 WorkerService 去执行
    
        private final Queue<SocketChannel> acceptedQueue;       // 这个线程负责的数据
        private final Queue<SelectionKey> updateQueue;
    }
    
    IOWorkRequest {
        // 对请求进行一次封装
        // 属于生产-消费者模型
    }
}

NIOServerCnxnFactory 启动过程

    // NIOServerCnxnFactory.java start()
    public void start() {
        stopped = false;
        if (workerPool == null) {
            workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
        }
        for (SelectorThread thread : selectorThreads) {
            if (thread.getState() == Thread.State.NEW) {
                // 启动工作线程组
                thread.start();
            }
        }
        // ensure thread is started once and only once
        if (acceptThread.getState() == Thread.State.NEW) {
            // 先检查状态,确保 acceptThread 只会启动一次
            acceptThread.start();
        }
        if (expirerThread.getState() == Thread.State.NEW) {
            expirerThread.start();
        }
    }

AcceptThread 类的工作,只要是监听连接,并把 Channel 挂到 SelectorThread 线程中。

    // AcceptThread.java 的run方法
    public void run() {
        try {
            while (!stopped && !acceptSocket.socket().isClosed()) {
                try {
                    select();       // 循环遍历此处
                } catch (RuntimeException e) {
                    LOG.warn("Ignoring unexpected runtime exception", e);
                } catch (Exception e) {
                    // 这里需要忽略所有异常,保护 AcceptThread
                    LOG.warn("Ignoring unexpected exception", e);
                }
            }
        } finally {
            closeSelector();
            // This will wake up the selector threads, and tell the
            // worker thread pool to begin shutdown.
            if (!reconfiguring) {
                NIOServerCnxnFactory.this.stop();
            }
            LOG.info("accept thread exitted run method");
        }
    }
    
    private void select() {
        try {
            selector.select();

            Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
            while (!stopped && selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) {
                    continue;
                }
                if (key.isAcceptable()) {
                    if (!doAccept()) {  // 执行接受连接的方法
                        // If unable to pull a new connection off the accept
                        // queue, pause accepting to give us time to free
                        // up file descriptors and so the accept thread
                        // doesn't spin in a tight loop.
                        pauseAccept(10);
                    }
                } else {
                    LOG.warn("Unexpected ops in accept select {}", key.readyOps());
                }
            }
        } catch (IOException e) {
            LOG.warn("Ignoring IOException while selecting", e);
        }
    }
    
    private boolean doAccept() {
        boolean accepted = false;
        SocketChannel sc = null;        // acceptSocket 返回的客户端 SocketChannel
        try {
            // 处理 Accpet 事件
            sc = acceptSocket.accept();
            accepted = true;
            if (limitTotalNumberOfCnxns()) {
                throw new IOException("Too many connections max allowed is " + maxCnxns);
            }
            InetAddress ia = sc.socket().getInetAddress();
            int cnxncount = getClientCnxnCount(ia);

            if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
                throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns);
            }

            LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress());

            sc.configureBlocking(false);

            // Round-robin assign this connection to a selector thread
            if (!selectorIterator.hasNext()) {
                selectorIterator = selectorThreads.iterator();  // 形成循环闭环
            }
            SelectorThread selectorThread = selectorIterator.next();
            if (!selectorThread.addAcceptedConnection(sc)) {        // 把 客户端 SocketChannel 注册到某一个 SelectorThread 中去
                throw new IOException("Unable to add connection to selector queue"
                                      + (stopped ? " (shutdown in progress)" : ""));
            }
            acceptErrorLogger.flush();
            // 至此,Accept 接受连接的工作已经完成了。
        } catch (IOException e) {
            // accept, maxClientCnxns, configureBlocking
            ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
            acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage());
            fastCloseSock(sc);
        }
        return accepted;
    }

小结 AcceptThread.java 的代码:

  • 持有 ServerSocketChannel acceptSocket,用于接受连接
  • 持有 selectorThreads , 接受连接后把 Channel 挂在 某个 SelectorThread 上

接下来,查看一下 SelectorThread 需要负责的事,根据 NIO 的习惯,SelectorThread 就要遍历自己的 Channel 是否可读,假如可读,则把数据从 Buffer 提取出来,根据约定切割,包装成独立的请求,入队,以提供给 WorkerPool(一个线程池组) 去使用。

// SelectorThread.java 
private void select() {
    try {
        selector.select();

        Set<SelectionKey> selected = selector.selectedKeys();
        ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
        Collections.shuffle(selectedList);
        Iterator<SelectionKey> selectedKeys = selectedList.iterator();
        while (!stopped && selectedKeys.hasNext()) {
            // NIO 的常规操作,获取Key,并响应Key的定义的操作
            SelectionKey key = selectedKeys.next();
            selected.remove(key);

            if (!key.isValid()) {
                cleanupSelectionKey(key);
                continue;
            }
            if (key.isReadable() || key.isWritable()) {
                handleIO(key); // 从此处debug进去
            } else {
                LOG.warn("Unexpected ops in select {}", key.readyOps());
            }
        }
    } catch (IOException e) {
        LOG.warn("Ignoring IOException while selecting", e);
    }
}

// debug 到此处
private void handleIO(SelectionKey key) {
    // 对 IO 事件做一次封装
    IOWorkRequest workRequest = new IOWorkRequest(this, key);
    NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();

    // Stop selecting this key while processing on its
    // connection
    cnxn.disableSelectable();
    key.interestOps(0);
    touchCnxn(cnxn);
    // 提交到 Pool 中,让 Worker 去消费
    workerPool.schedule(workRequest);
}

至此,我们已经明白 ZooKeeper 服务端,从启动到 NIO 提供服务的状态了,并且客户端连接后,最终请求会被包转被提交到 WorkerService.java 中被执行,WorkerService 只是一个执行池,最终处理请求的是 NIOServerCnxn.doIO(key)。绕了很多圈- -

至此,我们假设,网络连接已经能够建立了。

-Dzookeeper.serverCnxnFactory="org.apache.zookeeper.server.NettyServerCnxnFactory" 可以选择启动 NettyServerCnxnFactory 连接管理器,这个实现可以提供 SSL 服务。

NIOServerCnxn 如何处理 IO 操作

NIO 的网络处理模型大多数其实是相似,看看 IORequest 后面会如何继续处理IO操作。

// 每一个请求的包装到包括:要做什么,和做事情的对象。
private class IOWorkRequest extends WorkerService.WorkRequest {

    // 这个 IO 操作最终会传递到 cnxn.doIO 去处理
    public void doWork() {
       if (!key.isValid()) {
            selectorThread.cleanupSelectionKey(key);
            return;
        }
        if (key.isReadable() || key.isWritable()) {
            cnxn.doIO(key);     // 执行IO操作

            // Check if we shutdown or doIO() closed this connection
            if (stopped) {
                cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
                return;
            }
            if (!key.isValid()) {
                selectorThread.cleanupSelectionKey(key);
                return;
            }
            touchCnxn(cnxn);
        }
    }
}

NIOServerCnxn.java 
/**
 * Handles read/write IO on connection.
 */
void doIO(SelectionKey k) throws InterruptedException {
    try {
        if (!isSocketOpen()) {
            LOG.warn("trying to do i/o on a null socket for session: 0x{}", Long.toHexString(sessionId));
            return;
        }
        if (k.isReadable()) {
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                handleFailedRead();
            }
            if (incomingBuffer.remaining() == 0) {
                boolean isPayload;
                if (incomingBuffer == lenBuffer) { // start of next request
                    incomingBuffer.flip();
                    isPayload = readLength(k);      // 判断buffer里的数值是否已经足够 |len(data)|data...}
                    incomingBuffer.clear();
                } else {
                    // continuation
                    isPayload = true;
                }
                if (isPayload) { // not the case for 4letterword
                    readPayload();      // 读取有效载荷,进行真正的操作。从这里切入
                } else {
                    // four letter words take care
                    // need not do anything else
                    return;
                }
            }
        }
        if (k.isWritable()) {
            handleWrite(k);

            if (!initialized && !getReadInterest() && !getWriteInterest()) {
                throw new CloseRequestException("responded to info probe", DisconnectReason.INFO_PROBE);
            }
        }
    } catch (CancelledKeyException e) {
        // ... 省略一系列异常处理
    }
}

// 读取数据,进行操作
private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
    if (incomingBuffer.remaining() != 0) { // have we read length bytes?
        int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
        if (rc < 0) {
            handleFailedRead();
        }
    }

    if (incomingBuffer.remaining() == 0) { // have we read length bytes?
        incomingBuffer.flip();
        packetReceived(4 + incomingBuffer.remaining());
        if (!initialized) {
            readConnectRequest();   // 处理连接请求
        } else {
            readRequest();          // 处理事务请求
        }
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }
}

至此 NIOServerCnxn.java 的工作就告一段落了。接下来它会通过 readRequestreadConnectRequest 将请求传递到 ZooKeeperServer 中去。

我们停一下,假设我们都理解了 NIO 提供的字节码网络层服务,现在来思考 NIOServerCnxn 的生命周期和它主要做的是什么工作:

  1. 当客户端连接到服务端时,由 NIOServerCnxnFactory.AcceptThread 构建被保存在 SelectionKeyattachment 中.
  2. 内部持有的对象包括如下:
    • NIOServerCnxnFactory
    • SocketChannel 持有的客户端连接
    • SelectorThread 管理的 Selector
    • SelectionKey
    • incomingBuffer outgoingBuffers // Buffer 缓冲
  3. 由此可见,NIOServerCnxn 是服务端用来维护客户端的连接对象,这个对象包含了 BufferSocket 等信息,并提供以下服务:
    • 读取数据时:向下是直接读取 NIO 的 Buffer , 向上会提交对象到 RequestThrottler(限流器)
    • 写数据时:通过写数据到 outgongBuffers,在唤醒Selector,SelectorThread 会通知 WorkerService 去进行 doIO 操作。
    • NIOServerCnxn 还兼顾实现 Data 的 Watch 实现。暂不展开聊
  4. NIOServerCnxn 的 close 有很多中可能,具体可以查看 DisconnectReason。

后记

  • 读源码,既要深入分析,也要抬头总结,分析项目的模块设计,建立树状知识网。不然很快就会忘了
  • 硬吃 NIO 的源码比较吃力,需要及时补充 JDK-NIO 的知识和概念,自己去并发编程网读了文档,还有写了 NIO 的 Demo 后,对阅读 ZooKeeper 的网络连接源码更加容易了。
  • ZooKeeper 网络层使用 JDK NIO 实现,网络层和应用层通过 NIOServerCnxn 作为中介进行数据处理
  • 任何的复杂项目大多数都是自顶向下的设计,再深入阅读源码的过程中,最好可以分模块或者分层次阅读。
    • 例如:可以尝试阅读 网络层 的,那就针对网络层的去读,超越网络层就可以暂停不读了
  • 文章并没完全写下自己思考的内容,一方面是自己还没完全确认,所以不容易落笔,希望可以通过讨论继续完成学习。
  • 文章有几张图的话,会容易很多。

预告

  • ZooKeeper 在处理请求的时候,RequestProcessor 非常有意思,在分析 ZooKeeper 数据模型和实现的时候会继续分析。

引用

  1. 腾讯 ZooKeeper 源码和实践揭秘
  2. 【Zookeeper】源码分析之网络通信(一)
  3. 【Zookeeper】源码分析之网络通信(二)
  4. SelectionKey 是什么?