前言
Debug 一下 ZooKeeper 的客户端和服务端的网络连接源码,主要是希望知道当客户端发一个请求过来时,ZooKeeper 都做了哪些操作。上两篇文章已经记述了调试环境的搭建和群首选举的,期望可以熟悉一下 ZooKeeper 的设计。
从网络连接开始
在群首选举中,我们除了分析里面的选举算法,还特意看了一下算法下面的基础网络连接是怎么样的。这次也先从网络连接层开始,继续细节分析。为啥要重复分析网络连接层?原因有两点:
- 因为我自己在网络连接层的方面,知道的理论比较多,但实操比较少;
- 所有的服务程序,中间件都必须有网络连接层,多读读这些代码,理解他们对网络连接的解决方案;
JDK NIO 的服务启动
PS:阅读源码最好的办法还是从问题出发,而不是从main方法出发。我从main方法出发,是因为还是一个源码阅读的初学者,希望知悉一个路径的所有细节,也就是垂直阅读源码。等到明白各个组件main方法的大同小异(例如:网络组件等如何start),后面的源码就会直接选择一个问题来阅读了。——Note
NIOServerCnxnFactory
是默认的连接管理器,内部是使用 JDK NIO 来实现网络连接的。相比下,群首选举 是使用的 BIO 做长连接。下面简单看看连接器内部的分工,还有类的关键字段:
NIOServerCnxnFactory {
WorkerService workerPool
// 一个工作者池,专门用于执行具体的IO操作的。这里注意区别于 SelectorThread
AcceptThread {
// 负责Channel建立的线程 类似 Netty Boss 线程
private final ServerSocketChannel acceptSocket; // NIO 服务端 Channel
private final SelectionKey acceptKey;
private final Collection<SelectorThread> selectorThreads; // Workers 们
}
SelectorThread {
// 负责处理事件的线程 负责轮询 SelectionKey,
// 假如有对应事件,封装成IOWorkRequest,放进队列里,让 WorkerService 去执行
private final Queue<SocketChannel> acceptedQueue; // 这个线程负责的数据
private final Queue<SelectionKey> updateQueue;
}
IOWorkRequest {
// 对请求进行一次封装
// 属于生产-消费者模型
}
}
NIOServerCnxnFactory 启动过程
// NIOServerCnxnFactory.java start()
public void start() {
stopped = false;
if (workerPool == null) {
workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
}
for (SelectorThread thread : selectorThreads) {
if (thread.getState() == Thread.State.NEW) {
// 启动工作线程组
thread.start();
}
}
// ensure thread is started once and only once
if (acceptThread.getState() == Thread.State.NEW) {
// 先检查状态,确保 acceptThread 只会启动一次
acceptThread.start();
}
if (expirerThread.getState() == Thread.State.NEW) {
expirerThread.start();
}
}
AcceptThread 类的工作,只要是监听连接,并把 Channel 挂到 SelectorThread 线程中。
// AcceptThread.java 的run方法
public void run() {
try {
while (!stopped && !acceptSocket.socket().isClosed()) {
try {
select(); // 循环遍历此处
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
// 这里需要忽略所有异常,保护 AcceptThread
LOG.warn("Ignoring unexpected exception", e);
}
}
} finally {
closeSelector();
// This will wake up the selector threads, and tell the
// worker thread pool to begin shutdown.
if (!reconfiguring) {
NIOServerCnxnFactory.this.stop();
}
LOG.info("accept thread exitted run method");
}
}
private void select() {
try {
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
if (!doAccept()) { // 执行接受连接的方法
// If unable to pull a new connection off the accept
// queue, pause accepting to give us time to free
// up file descriptors and so the accept thread
// doesn't spin in a tight loop.
pauseAccept(10);
}
} else {
LOG.warn("Unexpected ops in accept select {}", key.readyOps());
}
}
} catch (IOException e) {
LOG.warn("Ignoring IOException while selecting", e);
}
}
private boolean doAccept() {
boolean accepted = false;
SocketChannel sc = null; // acceptSocket 返回的客户端 SocketChannel
try {
// 处理 Accpet 事件
sc = acceptSocket.accept();
accepted = true;
if (limitTotalNumberOfCnxns()) {
throw new IOException("Too many connections max allowed is " + maxCnxns);
}
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns);
}
LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
// Round-robin assign this connection to a selector thread
if (!selectorIterator.hasNext()) {
selectorIterator = selectorThreads.iterator(); // 形成循环闭环
}
SelectorThread selectorThread = selectorIterator.next();
if (!selectorThread.addAcceptedConnection(sc)) { // 把 客户端 SocketChannel 注册到某一个 SelectorThread 中去
throw new IOException("Unable to add connection to selector queue"
+ (stopped ? " (shutdown in progress)" : ""));
}
acceptErrorLogger.flush();
// 至此,Accept 接受连接的工作已经完成了。
} catch (IOException e) {
// accept, maxClientCnxns, configureBlocking
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage());
fastCloseSock(sc);
}
return accepted;
}
小结 AcceptThread.java 的代码:
- 持有 ServerSocketChannel acceptSocket,用于接受连接
- 持有 selectorThreads , 接受连接后把 Channel 挂在 某个 SelectorThread 上
接下来,查看一下 SelectorThread 需要负责的事,根据 NIO 的习惯,SelectorThread 就要遍历自己的 Channel 是否可读,假如可读,则把数据从 Buffer 提取出来,根据约定切割,包装成独立的请求,入队,以提供给 WorkerPool(一个线程池组) 去使用。
// SelectorThread.java
private void select() {
try {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
Collections.shuffle(selectedList);
Iterator<SelectionKey> selectedKeys = selectedList.iterator();
while (!stopped && selectedKeys.hasNext()) {
// NIO 的常规操作,获取Key,并响应Key的定义的操作
SelectionKey key = selectedKeys.next();
selected.remove(key);
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
if (key.isReadable() || key.isWritable()) {
handleIO(key); // 从此处debug进去
} else {
LOG.warn("Unexpected ops in select {}", key.readyOps());
}
}
} catch (IOException e) {
LOG.warn("Ignoring IOException while selecting", e);
}
}
// debug 到此处
private void handleIO(SelectionKey key) {
// 对 IO 事件做一次封装
IOWorkRequest workRequest = new IOWorkRequest(this, key);
NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
// Stop selecting this key while processing on its
// connection
cnxn.disableSelectable();
key.interestOps(0);
touchCnxn(cnxn);
// 提交到 Pool 中,让 Worker 去消费
workerPool.schedule(workRequest);
}
至此,我们已经明白 ZooKeeper 服务端,从启动到 NIO 提供服务的状态了,并且客户端连接后,最终请求会被包转被提交到 WorkerService.java 中被执行,WorkerService 只是一个执行池,最终处理请求的是 NIOServerCnxn.doIO(key)
。绕了很多圈- -
至此,我们假设,网络连接已经能够建立了。
-Dzookeeper.serverCnxnFactory="org.apache.zookeeper.server.NettyServerCnxnFactory" 可以选择启动 NettyServerCnxnFactory 连接管理器,这个实现可以提供 SSL 服务。
NIOServerCnxn 如何处理 IO 操作
NIO 的网络处理模型大多数其实是相似,看看 IORequest 后面会如何继续处理IO操作。
// 每一个请求的包装到包括:要做什么,和做事情的对象。
private class IOWorkRequest extends WorkerService.WorkRequest {
// 这个 IO 操作最终会传递到 cnxn.doIO 去处理
public void doWork() {
if (!key.isValid()) {
selectorThread.cleanupSelectionKey(key);
return;
}
if (key.isReadable() || key.isWritable()) {
cnxn.doIO(key); // 执行IO操作
// Check if we shutdown or doIO() closed this connection
if (stopped) {
cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
return;
}
if (!key.isValid()) {
selectorThread.cleanupSelectionKey(key);
return;
}
touchCnxn(cnxn);
}
}
}
NIOServerCnxn.java
/**
* Handles read/write IO on connection.
*/
void doIO(SelectionKey k) throws InterruptedException {
try {
if (!isSocketOpen()) {
LOG.warn("trying to do i/o on a null socket for session: 0x{}", Long.toHexString(sessionId));
return;
}
if (k.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
handleFailedRead();
}
if (incomingBuffer.remaining() == 0) {
boolean isPayload;
if (incomingBuffer == lenBuffer) { // start of next request
incomingBuffer.flip();
isPayload = readLength(k); // 判断buffer里的数值是否已经足够 |len(data)|data...}
incomingBuffer.clear();
} else {
// continuation
isPayload = true;
}
if (isPayload) { // not the case for 4letterword
readPayload(); // 读取有效载荷,进行真正的操作。从这里切入
} else {
// four letter words take care
// need not do anything else
return;
}
}
}
if (k.isWritable()) {
handleWrite(k);
if (!initialized && !getReadInterest() && !getWriteInterest()) {
throw new CloseRequestException("responded to info probe", DisconnectReason.INFO_PROBE);
}
}
} catch (CancelledKeyException e) {
// ... 省略一系列异常处理
}
}
// 读取数据,进行操作
private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
if (rc < 0) {
handleFailedRead();
}
}
if (incomingBuffer.remaining() == 0) { // have we read length bytes?
incomingBuffer.flip();
packetReceived(4 + incomingBuffer.remaining());
if (!initialized) {
readConnectRequest(); // 处理连接请求
} else {
readRequest(); // 处理事务请求
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
}
至此 NIOServerCnxn.java 的工作就告一段落了。接下来它会通过 readRequest
和 readConnectRequest
将请求传递到 ZooKeeperServer
中去。
我们停一下,假设我们都理解了 NIO 提供的字节码网络层服务,现在来思考 NIOServerCnxn
的生命周期和它主要做的是什么工作:
- 当客户端连接到服务端时,由
NIOServerCnxnFactory.AcceptThread
构建被保存在SelectionKey
的attachment
中. - 内部持有的对象包括如下:
- NIOServerCnxnFactory
- SocketChannel 持有的客户端连接
- SelectorThread 管理的 Selector
- SelectionKey
- incomingBuffer outgoingBuffers // Buffer 缓冲
- 由此可见,NIOServerCnxn 是服务端用来维护客户端的连接对象,这个对象包含了
Buffer
、Socket
等信息,并提供以下服务:- 读取数据时:向下是直接读取 NIO 的 Buffer , 向上会提交对象到
RequestThrottler(限流器)
- 写数据时:通过写数据到 outgongBuffers,在唤醒Selector,SelectorThread 会通知 WorkerService 去进行 doIO 操作。
- NIOServerCnxn 还兼顾实现 Data 的 Watch 实现。暂不展开聊
- 读取数据时:向下是直接读取 NIO 的 Buffer , 向上会提交对象到
NIOServerCnxn
的 close 有很多中可能,具体可以查看 DisconnectReason。
后记
- 读源码,既要深入分析,也要抬头总结,分析项目的模块设计,建立树状知识网。不然很快就会忘了
- 硬吃 NIO 的源码比较吃力,需要及时补充 JDK-NIO 的知识和概念,自己去并发编程网读了文档,还有写了 NIO 的 Demo 后,对阅读 ZooKeeper 的网络连接源码更加容易了。
- ZooKeeper 网络层使用 JDK NIO 实现,网络层和应用层通过 NIOServerCnxn 作为中介进行数据处理
- 任何的复杂项目大多数都是自顶向下的设计,再深入阅读源码的过程中,最好可以分模块或者分层次阅读。
- 例如:可以尝试阅读 网络层 的,那就针对网络层的去读,超越网络层就可以暂停不读了
- 文章并没完全写下自己思考的内容,一方面是自己还没完全确认,所以不容易落笔,希望可以通过讨论继续完成学习。
- 文章有几张图的话,会容易很多。
预告
- ZooKeeper 在处理请求的时候,
RequestProcessor
非常有意思,在分析 ZooKeeper 数据模型和实现的时候会继续分析。