Netty 源码解析系列-服务端启动流程解析

1,179 阅读9分钟

netty源码解析系列

1.服务端启动例子(基于4.0.31.Final)

   public class Server {
    private ServerBootstrap serverBootstrap;
    private NioEventLoopGroup bossGroup;
    private NioEventLoopGroup workGroup;

    public static void main(String[] args) throws InterruptedException {
        System.out.println("服务启动");
        Server server = new Server();
        server.start();
    }
    private void start() throws InterruptedException {
       try {
             serverBootstrap=new ServerBootstrap();
             bossGroup = new NioEventLoopGroup();
             workGroup = new NioEventLoopGroup(4);
             serverBootstrap.group(bossGroup, workGroup)
                               .channel(NioServerSocketChannel.class)
                               .option(ChannelOption.SO_BACKLOG, 128)
                               .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                               .childOption(ChannelOption.SO_KEEPALIVE, true)
			        .handler(new InitHandler())
                               .childHandler(new IOChannelInitialize());
             ChannelFuture future = serverBootstrap.bind(8802).sync();
             future.channel().closeFuture().sync();
          } finally {
                 bossGroup.shutdownGracefully();
                 workGroup.shutdownGracefully();
          }

}

    private class IOChannelInitialize extends ChannelInitializer<SocketChannel>{

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            System.out.println("initChannel");
            ch.pipeline().addLast(new IdleStateHandler(1000, 0, 0));
            ch.pipeline().addLast(new IOHandler());
        }
    }
}

步骤说明

  • 1.1 创建 ServerBootstrap 实例,它是 netty 的启动辅助类,提供了一系列的方法用于设置服务 端启动相关的参数。底层通过门面模式对各种能力进行抽象和封装,尽量不需要用户跟过 多的底层 API 打交道,降低用户的开发难度

  • 1.2 NioEventLoopGroupnetty Reactor 线程池,bossGroup 监听和 accept 客户端连接,workGroup 则处理 IO ,编解码

  • 1.3 绑定服务端 NioServerSocketChannel

  • 1.4 设置一些参数

  • 1.5 初始化 pipeline 并绑定 handlerpipeline 是一个负责处理网络事件的职责链,负责管理和执行 ChannelHandler ,设置系统提供的 IdleStateHandler 和自定义 IOHandler

  • 1.6 serverBootstrap.bind(8802) 这里才是启动服务端绑定端口

  • 1.7 future.channel().closeFuture().sync(); 等待服务端关闭

  • 1.8 优雅关闭

2. 源码分析

2.1 NioEventLoopGroup

    NioEventLoopGroup 不仅仅是 I/O 线程,除了负责 I/O 的读写,还负责系统 Task 和定时任务

     public NioEventLoopGroup(int nThreads) {
           this(nThreads, null);
       }
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
   this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(
        int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
    super(nThreads, threadFactory, selectorProvider);
}
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

    继续,以下是精简代码

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
			...
	if (threadFactory == null) {
    		threadFactory = newDefaultThreadFactory();
	}
	children = new SingleThreadEventExecutor[nThreads];
	if (isPowerOfTwo(children.length)) {
    		chooser = new PowerOfTwoEventExecutorChooser();
	} else {
    		chooser = new GenericEventExecutorChooser();
	}
	for (int i = 0; i < nThreads; i ++) {
		...
        	children[i] = newChild(threadFactory, args);
		...
     }
    

     MultithreadEventExecutorGroup 实现了线程的创建和线程的选择,我们看看 newChild 方法( NioEventLoopGroup 类的方法),newChild 实例化线程

@Override
protected EventExecutor newChild(
        ThreadFactory threadFactory, Object... args) throws Exception {
    return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}

    创建了一个 NioEventLoop

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
    super(parent, threadFactory, false);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    provider = selectorProvider;
    selector = openSelector();
}

    跟着 super

protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    super(parent, threadFactory, addTaskWakesUp);
}

    代码有精简,继续

protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    thread = threadFactory.newThread(new Runnable() {
        	@Override
        	public void run() {
    			SingleThreadEventExecutor.this.run();
	}
 }
});

    在这里实例化了一个线程,并在 run 中调用 SingleThreadEventExecutorrun 方法,这个线程在哪里启动的呢,我们继续往下看
    总结:
          NioEventLoopGroup 实际就是 Reactor 线程池,负责调度和执行客户端的接入、网络读写事件的处理、用户自定义任务和定时任务的执行。

2.2 ServerBootstrap

     ServerBootstrap 是服务端的启动辅助类,父类是 AbstractBootstrap ,与之相对应的客户端启动辅助类是 Bootstrap

    public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
		volatile EventLoopGroup group;
		private volatile ChannelFactory<? extends C> channelFactory;
		private volatile SocketAddress localAddress;
		private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
		private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
		private volatile ChannelHandler handler;
	  }

2.2.1 设置booss和work线程池

     将 bossGroup 传给父类,workGroup 赋值给 serverBootstrapchildGroup

      public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    		super.group(parentGroup);
    		if (childGroup == null) {
        		throw new NullPointerException("childGroup");
    		}
    		if (this.childGroup != null) {
        		throw new IllegalStateException("childGroup set already");
    		}
    		this.childGroup = childGroup;
    		return this;
}

2.2.2 设置NioServerSocketChannel处理连接请求

serverBootstrap.channel(NioServerSocketChannel.class) 
   public B channel(Class<? extends C> channelClass) {
    			if (channelClass == null) {
        			throw new NullPointerException("channelClass");
   			 }
   			 return channelFactory(new BootstrapChannelFactory<C>(channelClass));
   }

     继续跟 new BootstrapChannelFactory

      private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
    			private final Class<? extends T> clazz;
    			BootstrapChannelFactory(Class<? extends T> clazz) {
        			this.clazz = clazz;
    			}
    			@Override
			public T newChannel() {
        			try {
            				return clazz.newInstance();
        			} catch (Throwable t) {
            				throw new ChannelException("Unable to create Channel from class " + clazz, t);
        			}
    			}
		}

     BootstrapChannelFactory 是一个继承了 ChannelFactory 的内部类,从名称上就能看出,这是一个 channel 工厂类,重写了父类的 newChannel 方法,通过反射创建 NioServerSocketChannel 实例,后面会告诉你是在哪里调用到的

2.2.3 设置channel通道块的值

         serverBootstrap.option(ChannelOption.SO_BACKLOG, 128) 
          public <T> B option(ChannelOption<T> option, T value) {
    			if (option == null) {
        			throw new NullPointerException("option");
    			}
    			if (value == null) {
        			synchronized (options) {
            				options.remove(option);
        			}
    			} else {
        			synchronized (options) {
            			options.put(option, value);
        			}
    			}
    			return (B) this;
          }

     这里的 option 方法是父类 AbstractBootstrap 的方法,options 是一个有序的非线程安全的双向链表,加锁添加

2.2.4 serverBootstrap.childOption

		public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
    			if (childOption == null) {
       				 throw new NullPointerException("childOption");
    			}
    			if (value == null) {
        			synchronized (childOptions) {
            				childOptions.remove(childOption);
       				 }
    			} else {
        			synchronized (childOptions) {
            				childOptions.put(childOption, value);
        			}
    			}
    			return this;
		}

     childOption 是子类 serverBootstrap 的方法
     childOptionoption 的区别:
         option : 主要是设置 ServerChannel 的一些选项
         childOption : 主要设置 ServerChannel 的子 channel 的选项,即 option
                       针对的是 boss 线程而 childOption 针对的是 work 线程池

2.2.5 设置服务端NioServerSocketChannel的Handler

         serverBootstrap.handler(new InitHandler())
         public B handler(ChannelHandler handler) {
    			if (handler == null) {
        			throw new NullPointerException("handler");
    			}
    			this.handler = handler;
    			return (B) this;
         }

2.2.6 serverBootstrap.childHandler()

           public ServerBootstrap childHandler(ChannelHandler childHandler) {
    			   if (childHandler == null) {
        				throw new NullPointerException("childHandler");
    			   }
       			   this.childHandler = childHandler;
    			   return this;
               
           }

handlerchildHandler 的区别
    Handler 是属于服务端 NioServerSocketChannel ,只会创建一次 childHandler 是属于每一个新建的 NioSocketChannel ,每当有一个连接上来,都会调用

2.2.7 真正的启动过程是在这里执行,我们看看bind()方法

         serverBootstrap.bind(8802).sync() 
         public ChannelFuture bind(int inetPort) {
    			return bind(new InetSocketAddress(inetPort));
		}
  • (1) 通过端口号创建一个 InetSocketAddress ,继续 bind
 public ChannelFuture bind(SocketAddress localAddress) {
    			validate();
    			if (localAddress == null) {
        			throw new NullPointerException("localAddress");
    			}
    			return doBind(localAddress);
 }
  • (2) validate() 方法进行一些参数验证,我们直接看 doBind()
        private ChannelFuture doBind(final SocketAddress localAddress) {
    			final ChannelFuture regFuture = initAndRegister();
    			final Channel channel = regFuture.channel();
			if (regFuture.cause() != null) {
    				return regFuture;
			}
			if (regFuture.isDone()) {
    				ChannelPromise promise = channel.newPromise();
    				doBind0(regFuture, channel, localAddress, promise);
    				return promise;
			} else {
				final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
				regFuture.addListener(new ChannelFutureListener() {
    					@Override
    					public void operationComplete(ChannelFuture future) throws Exception {
        					Throwable cause = future.cause();
        					if (cause != null) {
 							promise.setFailure(cause);
                				} else {
                    					promise.executor = channel.eventLoop();
                				}
                					doBind0(regFuture, channel, localAddress, promise);
            					}
        				});
        			return promise;
    			}
		}
  • (3.1) 先看 initAndRegister ( AbstractBootstrap 类 ),去掉了一些不重要的
			final ChannelFuture initAndRegister() {
    				final Channel channel = channelFactory().newChannel();
        			init(channel);
				ChannelFuture regFuture = group().register(channel);
				return regFuture;
			}

     channelFactoryserverBootstrap.channel() 时创建的,在这里调用反射创建 NioServerSocketChannel 实例

  • (3.2.1) 再看 init(channel) 方法( ServerBootstrap 类)
    @Override
void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        channel.config().setOptions(options);
    }
final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
    	for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
        	@SuppressWarnings("unchecked")
        	AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
        	channel.attr(key).set(e.getValue());
    	}
     }
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(Channel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
            	ChannelHandler handler = handler();
            	if (handler != null) {
                	pipeline.addLast(handler);
            	}
            	pipeline.addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }	
    });
}

options()serverBootstrap.option() 赋值的 AbstractBootstrap 类的 options 双向链表成员变量,在这里将 optionsattrs 注入 channelP.addLast()NioServerSocketChannel 加入新的 handler (处理器),这里 pipeline 类似于 Servlet 的过滤器,管理所有 handler

  • (3.2.2) 再看 group().register() 方法
        这里的 groupbossGroup(NioEventLoopGroup----▷MultithreadEventLoopGroup) ,多次跳转到 SingleThreadEventLoop 类的 register() 方法
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    if (promise == null) {
        throw new NullPointerException("promise");
    }

    channel.unsafe().register(this, promise);
    return promise;
}
  • (3.2.3) 清除一些不重要的代码,下面才是真正的注册
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
       try {
    	    eventLoop.execute(new OneTimeTask() {
        	@Override
        	public void run() {
            	register0(promise);
        	}
    	    });
        } catch (Throwable t) {
        }
    }
}

     eventLoop.inEventLoop() 用来判断启动线程与当前线程是否相同,相同表示已经启动,不同则有两种可能:未启动或者线程不同。

  • (3.2.4) 这里线程还未启动,走 eventLoop.execute() ,这个 execute() 方法是 SingleThreadEventExecutor 类的
@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        if (isShutdown() && removeTask(task)) {
          reject();
        }
    }
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}
  • (3.2.5) 启动线程
private void startThread() {
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            thread.start();
        }
    }
}

     我们在最开始2.1里面 SingleThreadEventExecutor 构造方法内的 thread 就是在这里启动的,我们再回到2.1的

protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    thread = threadFactory.newThread(new Runnable() {
        	@Override
        	public void run() {
    			SingleThreadEventExecutor.this.run();
	}
 }
});
  • (3.2.6) 打开 SingleThreadEventExecutor.this.run() ;
          @Override
	protected void run() {
    		for (;;) {
        		boolean oldWakenUp = wakenUp.getAndSet(false);
        		try {
            			if (hasTasks()) {
                			selectNow();
            			} else {
                			select(oldWakenUp);
   					if (wakenUp.get()) {
        					selector.wakeup();
    					}
				}
			cancelledKeys = 0;
			needsToSelectAgain = false;
			final int ioRatio = this.ioRatio;
			if (ioRatio == 100) {
   				processSelectedKeys();
    				runAllTasks();
			} else {
    				final long ioStartTime = System.nanoTime();
    				processSelectedKeys();
    				final long ioTime = System.nanoTime() - ioStartTime;
    				runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
			}
			if (isShuttingDown()) {
    				closeAll();
    				if (confirmShutdown()) {
        				break;
    				}
			}
		} catch (Throwable t) {
            		try {
                		Thread.sleep(1000);
            		} catch (InterruptedException e) {
            		}
        	 }
    	   }
	  }

     在这里异步执行,轮询 select 客户端的 accept ,并且 runAllTasks 所有的任务

  • (3.3) 我们再看 (3.1) 里面的 ChannelFuture regFuture = group().register(channel); 跳转到 SingleThreadEventLoopregister 方法
@Override
public ChannelFuture register(Channel channel) {
	...
   	channel.unsafe().register(this, promise);
	return promise;
}

     以下是精简后的代码(位于 AbstractChannel 类的 AbstractUnsafe 内部类)

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
	...
	eventLoop.execute(new OneTimeTask() {
    	@Override
    	public void run() {
        	register0(promise);
    	}
	});
	...
}
private void register0(ChannelPromise promise) {
	...
	doRegister();
	...
	if (firstRegistration && isActive()) {
    		pipeline.fireChannelActive();
	}
	...
}

     继续(位于 AbstractNioChannel 类)

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
	...
	selectionKey = javaChannel().register(eventLoop().selector, 0, this);
	...        
}
}        

     将 NioServerSocketChannel 注册到 boss 线程池 NioEventLoopSelector 上。
在这里应该注册 OP_ACCEPT(16) 到多路复用器上
注册0的原因:
     (1)注册方法是多态的,它既可以被 NioServerSocketChannel 用来监听客户端的连接接入,也可以注册 SocketChannel 用来监听网络读或写操作
    (2)通过 SelectionKeyinterestOps(int ops) 方法可以方便地修改监听操作位

     再看 pipeline.fireChannelActive()

@Override
public ChannelPipeline fireChannelActive() {
    head.fireChannelActive();

    if (channel.config().isAutoRead()) {
        channel.read();
    }

    return this;
}

@Override
public Channel read() {
    pipeline.read();
    return this;
}
@Override
public ChannelPipeline read() {
    tail.read();
    return this;
}
@Override
public ChannelHandlerContext read() {
	...
        next.invokeRead();
	...
}
private void invokeRead() {
    try {
        ((ChannelOutboundHandler) handler()).read(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

     进到 HeadContextread

@Override
public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}

@Override public final void beginRead() { ... doBeginRead(); ... }

@Override
protected void doBeginRead() throws Exception {
if (inputShutdown) {
    return;
}

final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
    return;
}
readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

     最终在这里将 selectionKey 的监听操作位改为 OP_READ

  • (4) 再看 doBind0( ) 方法
 private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
        channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

     将方法丢到 reactor 线程池任务队列中执行,会先判断注册是否成功,成功则继续执行bind方法

  • (5) 执行 bind( ) 方法
      @Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
	...
	final AbstractChannelHandlerContext next = findContextOutbound();
	EventExecutor executor = next.executor();
	...
	next.invokeBind(localAddress, promise);
	...
}

    由于 bind 事件是出站事件,寻找出站的 handler ,执行 invokeBind( ) 方法

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}
@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
        throws Exception {
    unsafe.bind(localAddress, promise);
}
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
	...
	doBind(localAddress);
	...
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    javaChannel().socket().bind(localAddress, config.getBacklog());
}

     经过多层 bind 深入,最后在这里可以看到,还是会调用Java底层的nio进行 socket bind 自此,服务端启动流程解析完毕,我们总结一下
     ① 通过 ServerBootstrap 辅助启动类,配置了 reactor 线程池,服务端 Channel ,一些配置参数,客户端连接后的 handler
     ② 将 ServerBootstrap 的值初始化,并注册 OP_ACCEPT 到多路复用器
     ③ 启动 reactor 线程池,不断循环监听连接,处理任务
     ④ 绑定端口