Zookeeper源码分析(三) ----- 单机模式(standalone)运行

1,143 阅读10分钟

zookeeper源码分析系列文章:

原创博客,纯手敲,转载请注明出处,谢谢!

一、Zookeeper单机启动原理

Zookeeper属于C/S架构,也就是传统的客户端-服务器模式,客户端发送请求,服务器响应请求。这和高性能网络框架Netty是一样的,因此我们也可以猜想到它的启动方式无非就是从main()方法开始,客户端和服务器各有一个main()方法。

那我们先来看看Zookeeper服务器端的启动过程,当你打开Zookeeper目录下/bin目录中zkServer.cmd文件你就会发现,其实Zookeeper的启动入口为org.apache.zookeeper.server.quorum.QuorumPeerMain类的main方法,无论你是单机模式启动Zookeeper还是复制模式启动Zookeeper,执行入口都是这个类,至于如何区别是哪种模式启动,该类会根据你配置文件的配置进行判断,具体的判断接下来将会详细讲解。

zkServer.cmd详细源代码:

setlocal
call "%~dp0zkEnv.cmd"

set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain // 设置主类入口
echo on
call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" -cp "%CLASSPATH%"  %ZOOMAIN% "%ZOOCFG%" %*  // 执行该类的main()方法

endlocal

下面先看看单机启动时序图:

1、首先执行main方法 2、解析传进来的配置文件路径,默认会去${baseDir}\conf\zoo.cfg找配置文件 3、创建NIOServerCnxnFactory进行监听客户端的连接请求,在Zookeeper中有两种ServerCnxnFactory,一种是NIOServerCnxnFactory,另一种是NettyServerCnxnFactory,前者为默认工厂,后者除非你在启动main方法时指定Systemzookeeper.serverCnxnFactory属性值为NettyServerCnxnFactory

下面将详细深入源码分析各个阶段是如何实现以及工作的。

二、Zookeeper单机模式(standalone)启动

  • 1、Zookeeper是如何解析配置文件的?

zk的属性配置分为两种:

1、Java System property:Java系统环境变量,也就是System.setProperty()设置的参数

2、No Java system property配置文件属性,也就是你在配置文件中配置的属性

配置文件的解析原理很简单,无非就是解析一些.properties文件中的键值对,其实Java已经提供了Properties类来代表.properties文件中所有键值对集合,我们可以使用Properties对象的load()方法将一个配置文件装载进内存,然后对该对象进行遍历就得到我们锁配置的属性值集合了。

说到Zookeeper中的配置文件解析,原理也和上面差不多,只不过是在变量键值对的时候多了一些Zookeeper自身的逻辑判断。ZooKeeper中的配置文件解析从QuorumPeerConfig类的parse()方法说起,源代码如下:

/**
 * Parse a ZooKeeper configuration file 解析一个配置文件
 * @param path the patch of the configuration file
 * @throws ConfigException error processing configuration
 */
public void parse(String path) throws ConfigException {
    File configFile = new File(path);
    
    LOG.info("Reading configuration from: " + configFile);
    
    try {
    	if (!configFile.exists()) {
    		throw new IllegalArgumentException(configFile.toString() + " file is missing");
    	}
        // 声明一个Properties对象
    	Properties cfg = new Properties();
    	FileInputStream in = new FileInputStream(configFile);
    	try {
    	    // 传入一个配置文件输入流即可装载所有配置
    		cfg.load(in);
    	} finally {
    	    // 涉及到流的操作记得最后将流关闭
    		in.close();
    	}
        // 此处是zk自身的逻辑处理
    	parseProperties(cfg);
    } catch (IOException e) {
    	throw new ConfigException("Error processing " + path, e);
    } catch (IllegalArgumentException e) {
    	throw new ConfigException("Error processing " + path, e);
    }
}

接下来我们来看看上面的parseProperties(cfg)方法,该方法太长了,硬着头皮啃完:

/**
 * Parse config from a Properties.
 * @param zkProp Properties to parse from.
 * @throws IOException
 * @throws ConfigException
 */
public void parseProperties(Properties zkProp) throws IOException, ConfigException {
    int clientPort = 0;
    String clientPortAddress = null;
    // 遍历所有的key-value键值对
    for (Entry<Object, Object> entry : zkProp.entrySet()) {
        // 注意这里要把首尾空格去掉
        String key = entry.getKey().toString().trim();
        String value = entry.getValue().toString().trim();
        // 存储快照文件snapshot的目录配置
        if (key.equals("dataDir")) {
        	dataDir = value;
        	// 事务日志存储目录
        } else if (key.equals("dataLogDir")) {
        	dataLogDir = value;
        	// 客户端连接server的端口,zk启动总得有个端口吧!如果你没有配置,则会报错!一般我们会将端口配置为2181
        } else if (key.equals("clientPort")) {
        	clientPort = Integer.parseInt(value);
        	// 服务器IP地址
        } else if (key.equals("clientPortAddress")) {
        	clientPortAddress = value.trim();
        	// zk中的基本事件单位,用于心跳和session最小过期时间为2*tickTime
        } else if (key.equals("tickTime")) {
        	tickTime = Integer.parseInt(value);
        	// 客户端并发连接数量,注意是一个客户端跟一台服务器的并发连接数量,也就是说,假设值为3,那么某个客户端不能同时并发连接3次到同一台服务器(并发嘛!),否则会出现下面错误too many connections from /127.0.0.1 - max is 3
        } else if (key.equals("maxClientCnxns")) {
        	maxClientCnxns = Integer.parseInt(value);
        } else if (key.equals("minSessionTimeout")) {
        	minSessionTimeout = Integer.parseInt(value);
        } else if (key.equals("maxSessionTimeout")) {
        	maxSessionTimeout = Integer.parseInt(value);
        } else if (key.equals("initLimit")) {
        	initLimit = Integer.parseInt(value);
        } else if (key.equals("syncLimit")) {
        	syncLimit = Integer.parseInt(value);
        } else if (key.equals("electionAlg")) {
        	electionAlg = Integer.parseInt(value);
        } else if (key.equals("quorumListenOnAllIPs")) {
        	quorumListenOnAllIPs = Boolean.parseBoolean(value);
        } else if (key.equals("peerType")) {
            if (value.toLowerCase().equals("observer")) {
        	peerType = LearnerType.OBSERVER;
            } else if (value.toLowerCase().equals("participant")) {
        	peerType = LearnerType.PARTICIPANT;
            } else {
        	throw new ConfigException("Unrecognised peertype: " + value);
        }
	......

下面对解析的所有配置项用表格总结下: 所有的配置项都可以在官网查询到。

下面我们一起看看Zookkeeper的配置文件属性:

配置项 说明 异常情况 是否报错? 错误 or 备注
clientPort 服务端server监听客户端连接的端口 不配置 clientPort is not set
clientPortAddress 客户端连接的服务器ip地址 不配置 默认使用网卡的地址
dataDir 数据快照目录 不配置 dataDir is not set
dataLogDir 事务日志存放目录 不配置 默认跟dataDir目录相同
tickTime ZK基本时间单元(毫秒),用于心跳和超时.minSessionTimeout默认是两倍ticket 不配置 tickTime is not set
maxClientCnxns 同一ip地址最大并发连接数(也就是说同一个ip最多可以同时维持与服务器链接的个数) 不配置 默认最大连接数为60,设置为0则无限制
minSessionTimeout 最小会话超时时间,默认2*ticket 不配置 否,若minSessionTimeout > maxSessionTimeout,则报错 minSessionTimeout must not be larger than maxSessionTimeout
maxSessionTimeout 最大会话超时时间,默认20*ticket 不配置 不能小于minSessionTimeout
initLimit 允许follower同步和连接到leader的时间总量,以ticket为单位 不配置 initLimit is not set,如果zk管理的数据量特别大,则辞职应该调大
syncLimit followerleader之间同步的世间量 不配置 syncLimit is not set
electionAlg zk选举算法选择,默认值为3,表示采用快速选举算法 不配置 如果没有配置选举地址server,则抛Missing election port for server: serverid
quorumListenOnAllIPs 当设置为true时,ZooKeeper服务器将侦听来自所有可用IP地址的对等端的连接,而不仅仅是在配置文件的服务器列表中配置的地址(即集群中配置的server.1,server.2。。。。)。 它会影响处理ZAB协议和Fast Leader Election协议的连接。 默认值为false 不配置
peerType 服务器的角色,是观察者observer还是参与选举或成为leader,默认为PARTICIPANT 不配置 若配置了不知支持的角色,则报Unrecognised peertype:
autopurge.snapRetainCount 数据快照保留个数,默认是3,最小也是3 不配置
autopurge.purgeInterval 执行日志、快照清除任务的时间间隔(小时) 不配置 默认是 0
server.x=[hostname]:nnnnn[:nnnnn] 集群服务器配置 不配置 单机:否;集群:是 zk集群启动将加载该该配置,每台zk服务器必须有一个myid文件,里边存放服务器的id,该id值必须匹配server.x中的x ; 第一个端口表示与leader连接的端口,第二个端口表示用于选举的端口,第二个端口是可选的
  • 2、Zookeeper是如何判断何种模式启动服务器的?

因为ZookeeperZkServer.cmd启动文件指定的统一入口为org.apache.zookeeper.server.quorum.QuorumPeerMain,那么我们就要问了,那ZK是怎么判断我要单机模式启动还是集群方式启动呢?答案是明显的,也就是取决于你在配置文件zoo.cfg中是否有配置server.x=hostname:port1:port2,以上的配置项表明我们想让ZK以集群模式运行,那么在代码中是如何体现的呢?

上面讲到ZK解析配置文件的原理,我们依旧走进parseProperties()方法,看看如下代码:

.....
// 此处解析配置文件以server.开头的配置
} else if (key.startsWith("server.")) {
    // server.3
    int dot = key.indexOf('.');
    long sid = Long.parseLong(key.substring(dot + 1));
    String parts[] = splitWithLeadingHostname(value);
    if ((parts.length != 2) && (parts.length != 3) && (parts.length != 4)) {
    LOG.error(value + " does not have the form host:port or host:port:port "
    		+ " or host:port:port:type");
}
    LearnerType type = null;
    String hostname = parts[0];
    Integer port = Integer.parseInt(parts[1]);
    Integer electionPort = null;
    if (parts.length > 2) {
    electionPort = Integer.parseInt(parts[2]);
    }
if (parts.length > 3) {
    if (parts[3].toLowerCase().equals("observer")) {
    	type = LearnerType.OBSERVER;
    } else if (parts[3].toLowerCase().equals("participant")) {
    	type = LearnerType.PARTICIPANT;
    } else {
    	throw new ConfigException("Unrecognised peertype: " + value);
    }
}
if (type == LearnerType.OBSERVER) {
    observers.put(Long.valueOf(sid),
    		new QuorumServer(sid, hostname, port, electionPort, type));
    } else {
    // 如果配置了,那么就加进servers中,其中servers是一个本地缓存Map,用于存储配置的ip地址
    servers.put(Long.valueOf(sid), new QuorumServer(sid, hostname, port, electionPort, type));
}

如果配置了,那么serverssize>0,解析完成之后,回到QuorumPeerMaininitializeAndRun()方法:

 // 如果servers长度大于0,则集群方式启动,否则,单机启动
 if (args.length == 1 && config.servers.size() > 0) {
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running "
                + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args);
    }

从上面可以看出,单机启动的入口为ZooKeeperServerMain类,而统一的入口类为QuorumPeerMain,所以,在ZK中,服务器端的启动类就只有这两个了。

  • 3、单机模式下,Zookeeper是如何处理客户端请求的?

无论是哪种方式启动Zookeeper,它都必须对客户端的请求进行处理,那么ZK是如何处理客户端请求的呢?让我们一起来看看源码是怎么写的!

上面说到,Zk单机启动的入口类为ZooKeeperServerMain,我们一起看下其runFromConfig()方法:

/**
 * Run from a ServerConfig.
 * @param config ServerConfig to use.
 * @throws IOException
 */
public void runFromConfig(ServerConfig config) throws IOException {
    LOG.info("Starting server");
    FileTxnSnapLog txnLog = null;
    try {
        // 创建一个ZooKeeperServer,ZooKeeperServer代表具体运行的zk服务器,包含监听客户端请求
        final ZooKeeperServer zkServer = new ZooKeeperServer();
        // 这个是表明上面创建的ZooKeeperServer线程执行完之后,当前主线程才结束,类似Thread的join()方法
        final CountDownLatch shutdownLatch = new CountDownLatch(1);
        // 关闭服务器时的回调处理器
        zkServer.registerServerShutdownHandler(
                new ZooKeeperServerShutdownHandler(shutdownLatch));
        // 执行快照数据,日志的定时保存操作,指定保存路径
        txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
                config.dataDir));
        zkServer.setTxnLogFactory(txnLog);
        zkServer.setTickTime(config.tickTime);
        zkServer.setMinSessionTimeout(config.minSessionTimeout);
        zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
        // 创建ServerCnxnFactory,默认实现为NIOServerCnxnFactory,也可以指定为NettyServerCnxnFactory
        cnxnFactory = ServerCnxnFactory.createFactory();
        cnxnFactory.configure(config.getClientPortAddress(),
                config.getMaxClientCnxns());
        // 启动服务器,将一个服务器zkServer丢给工厂,然后启动
        cnxnFactory.startup(zkServer);
        // 这里将会等待,除非调用shutdown()方法
        shutdownLatch.await();
        shutdown();
        // 这里会等待直到zkServer线程完成
        cnxnFactory.join();
        if (zkServer.canShutdown()) {
            zkServer.shutdown(true);
        }
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Server interrupted", e);
    } finally {
        if (txnLog != null) {
            txnLog.close();
        }
    }
}

了解完上面的代码,我们明白单机启动ZooKeeperServerZK做了什么工作,主要点在zk创建的是哪种工厂,至于NIOServerCnxnFactory的代码,我就不说了,大家有兴趣可以去看看。

回归正题,让我们进入NIOServerCnxnFactoryrun()方法中看看:

public void run() {
while (!ss.socket().isClosed()) {
try {
    // 每一秒轮询一次
	selector.select(1000);
	Set<SelectionKey> selected;
	synchronized (this) {
		selected = selector.selectedKeys();
	}
	ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
	Collections.shuffle(selectedList);
	for (SelectionKey k : selectedList) {
	    // 如果有读请求或者连接请求,则接收请求
		if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
			SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
			InetAddress ia = sc.socket().getInetAddress();
			int cnxncount = getClientCnxnCount(ia);
			// 这里对maxClientCnxns做出判断,防止DOS攻击
			if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
				LOG.warn("Too many connections from " + ia + " - max is " + maxClientCnxns);
				sc.close();
			} else {
				LOG.info("Accepted socket connection from " + sc.socket().getRemoteSocketAddress());
				sc.configureBlocking(false);
				SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
				NIOServerCnxn cnxn = createConnection(sc, sk);
				sk.attach(cnxn);
				addCnxn(cnxn);
			}
		// 如果有读请求且客户端之前有连接过的,则直接处理
		} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
			NIOServerCnxn c = (NIOServerCnxn) k.attachment();
			c.doIO(k);
		} else {
			if (LOG.isDebugEnabled()) {
				LOG.debug("Unexpected ops in select " + k.readyOps());
			}
		}
	}
	selected.clear();
} catch (RuntimeException e) {
	LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
	LOG.warn("Ignoring exception", e);
}
}
closeAll();
LOG.info("NIOServerCnxn factory exited run method");
}

看到这,我觉得对于Zk如何监听处理客户端的请求就清晰多了,上面的代码主要采用轮询机制,每一秒轮询一次,通过selector.select(1000)方法指定,这里的监听方式和传统的BIO不同,传统的网络监听采用阻塞的accept()方法,zk采用java的nio实现。

谢谢阅读~~