深入Netty(四)-Netty服务端如何接收连接

484 阅读7分钟

上一篇我们了解了一下Netty三个核心类的主要作用,而且还大概勾画了一下Netty的运行机制。那么这一篇我们就从Netty服务端代码来剖析一下。

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup worker = new NioEventLoopGroup(8);

        ServerBootstrap b = new ServerBootstrap();
        try {

            b.group(boss, worker)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<NioSocketChannel>() {
                 @Override
                 protected void initChannel(NioSocketChannel ch) throws Exception {
                     //这两个handle为netty自带的
                     ch.pipeline().addLast(new StringDecoder());
                     ch.pipeline().addLast(new StringEncoder());
                     //这是实现我们自定义逻辑的handle
                     ch.pipeline().addLast(new ServerExampleHandler());
                 }
             });
            b.bind(8080).channel().closeFuture().sync();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

1.首先创建EventLoopGroup

首先我们创建了两个EventLoopGroup,上一篇我们说过这个类的主要作用是持有EventLoop,并把获取到的任务均匀的分配给他们。所以我们来看一下他的构造函数:

    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
    
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    
    //这边引入了选择器
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
    
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        //...

        children = new EventExecutor[nThreads];

        //线程数量决定了内部持有几个EventLoop
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // ...
            } finally {
                //...
            }
        }
        //根据选择工厂,创建了选择器。
        chooser = chooserFactory.newChooser(children);
        //...
    }

构造函数的代码跟踪比较简单,现阶段我们主要需要关注两个参数,一个是我们传进来的线程数量,还有一个就是倒数第二层引入的选择器工厂。为了看着更清晰,我保留了主要的逻辑代码。

下面我们看看newChild方法和选择器的创建。

    //newChild方法的具体实现在NioeventLoopGroup中
    //可以清晰的看到NioeventLoopGroup内部持有的child就是NioEventLoop
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }
    
    //chooser的创建也比较简单,首先判断child的数量是否是2的指数
    //如2,4,8,16等如果是则生成指数对应的选择器,如果不是则是普通的
    //这两个选择器的next方法大家可以自行查看。
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

2.设置ServerBootstrap的参数。

ServerBootstrap是服务端的启动引导器。我们来看看启动前我们配置了啥。

    //首先将第一个参数传递给父类,第二个参数自己保留
    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
        return this;
    }
    public B group(EventLoopGroup group) {
        ObjectUtil.checkNotNull(group, "group");
        if (this.group != null) {
            throw new IllegalStateException("group set already");
        }
        this.group = group;
        return self();
    }
    
    //其次设置服务端的channel,这个地方用了反射工厂,主要是保证channel必须有无参构造函数
    public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(
                ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }
    
    //最后设置了childHandler
    public ServerBootstrap childHandler(ChannelHandler childHandler) {
        this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
        return this;
    }

现阶段还是以剖析流程为主,所以不会配置太多,但是这3个设置一定要记住。

3.查看bind方法逻辑

在jdk的编程中,我们也会调用ServerSocketChannel的bind方法,那么Netty的bind方法和jdk的有啥区别呢?

    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
    
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }
    
    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        //...
                        promise.setFailure(cause);
                    } else {
                        //...
                        promise.registered();
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

前面的方法主要是验证参数是否设置和解析地址,真正的逻辑从doBind方法开始。

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            //...
        }
        ChannelFuture regFuture = config().group().register(channel);
        //...
        return regFuture;
    }

首先我们进入initAndRegister方法,该方法主要做了三件事

  1. 创建Channel
  2. 初始化Channel
  3. 注册Channel

创建Channel

还记得之前我们的三个设置么,这个地方的channel就是我们的第二个设置。 所以我们去到NioServerSocketChannel的无参构造函数中。

    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }

这个ServerSocketChannel就是我们在jdk中使用的那个。这也验证了我们上一篇的猜测,Netty的channel会持有jdk的Socket。

继续往下

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

在AbstractChannel中我们发现了创建UnSafe类和pipeline的方法。那么还是一样,这个地方先记住即可,不必太多纠缠。

初始化Channel

创建channel的分析就先到这里了,下一步来看看初始化channel,回到ServerBootstrap的init方法。

    void init(Channel channel) {
        //...
        ChannelPipeline p = channel.pipeline();
        //...
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                //注意这个地方的handel不是我们设置的childHandle
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

其他代码是一些参数的设置,我们先不管。这个init方法的主要作用就是往我们刚刚创建的channel的pipline中,放入一个初始化的handel。这里我们简单的看下pipline和ChannelInitializer:

    //pipline是在刚刚的AbstractChannel类中创建的
    //根据newChannelPipeline()方法,进去之后创建的就是DefaultChannelPipeline类
    //可以看出初始化阶段pipline中只有头和尾两个Context
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

在加上我们刚刚添加进pipline的ChannelInitializer,现在pipline中应该有三个context。后面会详细分析pipline的作用,这边记住pipline里面有几个组件即可。

现在在看一下ChannelInitializer

    //这是我们刚刚复写的方法
    protected abstract void initChannel(C ch) throws Exception;

    //在handle添加的时候会触发initChannel方法
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            //...
            if (initChannel(ctx)) {
                //...
                removeState(ctx);
            }
        }
    }
    
    //该方法会执行我们复写的initChannel方法,并在执行完成后删除pipline中的ChannelInitializer
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                //...
            } finally {
                ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
                    pipeline.remove(this);
                }
            }
            return true;
        }
        return false;
    }

在熟悉了这几个类的作用之后,我们知道,handle添加完成后,在NioServerSocketChannel的pipline中,此时有三个context。头,尾,和ServerBootstrapAcceptor。

注册Channel

    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            //...
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    //这个地方我们启动线程肯定不是eventLoop线程,所以进入execute方法
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    //...
                }
            }
    }
    
    public void execute(Runnable task) {
        ObjectUtil.checkNotNull(task, "task");
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }
    private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            startThread();
            //...
        }
    }
    private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }
    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                //...
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } //...
    }

在doStartThread方法中,我们看到了调用了SingleThreadEventExecutor.this.run()方法,而这个方法,正是我们上一篇分析NioEventLoop的run方法。

所以经过注册之后,NioEventLoop已经在不停的检测Selector的状态,而NioServerSocketChannel的pipline也已经准备就绪。我们去看一下NioEventLoop的run方法。

protected void run() {
    int selectCnt = 0;
    for (; ; ) {
        if (ioRatio == 100) {
            try {
                if (strategy > 0) {
                    //查看Selector的连接状态
                    processSelectedKeys();
                }
            } finally {
                //运行任务队列的任务
                ranTasks = runAllTasks();
            }
        } else if (strategy > 0) {
            final long ioStartTime = System.nanoTime();
            try {
                processSelectedKeys();
            } finally {
                // Ensure we always run tasks.
                final long ioTime = System.nanoTime() - ioStartTime;
                ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }
        } else {
            ranTasks = runAllTasks(0); // This will run the minimum number of tasks
        }
    }
}

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            //处理检测到事件的连接
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        //...
        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            }//...
        }
    }
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        //...
            //ServerSocketChannel关注的是OP_ACCEPT事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        //查看NioServerSocketChannel的该方法
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //触发读事件,将该channel分配给worker线程处理
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                //...
        }

    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            //...
        }
        return 0;
    }

到这儿服务端接收连接的逻辑就完成了。下一篇在分析一下Netty对连接的处理流程。