深入Netty(五)-Netty服务端对连接的处理

1,720 阅读5分钟

在上一篇介绍Netty服务端接收连接的时候,我们分析到连接接收了后,便被放入pipeline里面去执行读操作。代码如下:

//触发读事件,将该channel分配给worker线程处理
pipeline.fireChannelRead(readBuf.get(i));

所以要分析Netty如何处理连接,必须先要分析一下pipeline的工作机制。

1.Pipeline工作机制

pipeline的大概作用我们之前已经分析过了,一句话概括就是流水线模式。查看源码我们可以发现NioServerSocketChannel和NioSocketChannel自带的pipeline实现都是DefaultChannelPipeline,所以我们也从这个类开始分析;

首先查看构造方法:

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        ...
        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

DefaultChannelPipeline构造需要一个Channel,pipeline和channel是互相持有的,通过channel可以获取到pipeline,通过pipeline也可以获取到channel。

其次既然是pipeline,自然就要有头部节点和尾部节点,正是head和tail。

接着我们看看pipeline是如何添加节点的,查看addLast方法(其他方法也是可以的,逻辑都相似)。

    public final ChannelPipeline addLast(ChannelHandler handler) {
        return addLast(null, handler);
    }
    
    public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast(null, name, handler);
    }

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
            //给ChannelHandler一层封装AbstractChannelHandlerContext
            newCtx = newContext(group, filterName(name, handler), handler);
            //添加到队列末尾
            addLast0(newCtx);
            ...
        }
        callHandlerAdded0(newCtx);
        return this;
    }

查看一下newContext方法

    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        //handle至少简单的持有一下,继续查看super里面的方法
        super(pipeline, executor, name, handler.getClass());
        this.handler = handler;
    }

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
                                  String name, Class<? extends ChannelHandler> handlerClass) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        //这个地方要留一下心。
        this.executionMask = mask(handlerClass);
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

mask方法大家可以进去看一下,这个方法主要作用就是计算出handler的类型,因为netty有读操作,有写操作,所以根据mask方法计算出一个值,来标记一下这个handle的操作类型。

newContext方法到这边就结束了。接下来查看一下addLast0方法:

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

这个逻辑一眼就可以明白了,添加到末尾只是添加到尾节点之前,因为tail节点永远是最后一个节点,同理添加到头节点也是添加到头节点之后,head节点必须在第一个。

这样添加节点的逻辑我们也整明白了。下面就开始正式分析pipeline的工作流程。

查看fireChannelRead方法:

    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        //调用了一个静态的invokeChannelRead方法,我们进入该方法
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
    //touch方法暂时不用太在意,后面的判断则是决定是在当前线程执行任务还是放入线程池执行。
    //关键的方法在于next.invokeChannelRead(m)
    //此处传入的next是head节点,所以我们进入head节点查看invokeChannelRead方法
    
    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                //因为读方法要实现ChannelInboundHandler,所以此处先转换一下
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
    //HeadContext的handler返回的是this
    //所以进入HeadContext的channelRead方法
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.fireChannelRead(msg);
    }
    
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
        return this;
    }

    //到这儿我们就发现了,因为需要从头节点往后搜寻需要执行读操作的节点
    //所以Head节点必须是第一个节点,而之前mask方法计算出得值,也是用在了这个地方
    private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while ((ctx.executionMask & mask) == 0);
        return ctx;
    }
    
    //找到需要执行的读节点之后,便执行invokeChannelRead方法
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
    //大家应该发现了,这个方法前面已经遇到过,所以直接进入next.invokeChannelRead(m)
    
    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
    //我们又到这儿了,还是先转换成ChannelInboundHandler,在执行handler方法。
    //然而不同的是此时的handler()取出的已经是我们自己定义的handler了

到这儿我们就已经找到了自定义handler的执行路径。如果定义了多个handler,而且想继续往下执行的话,我们只需在自己定义的handler的channelRead方法里再次调用ctx.fireChannelRead(msg)方法即可。这是不是很像tomcat的Filter?

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

2.连接是怎么传递给worker的

熟悉了pipeline的工作机制后,我们来分析一下连接是怎么传递给worker线程组的。

经过上一篇的分析:

    void init(Channel channel) {
        //...
        ChannelPipeline p = channel.pipeline();
        //...
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                //注意这个地方的handel不是我们设置的childHandle
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

我们知道在ServerBootstrap的init方法中,boss线程组的pipeline里面添加了一个ServerBootstrapAcceptor,而NioServerSocketChannel接收到的线程直接执行了channelRead方法,所以我们查看一下这个类的channelRead方法:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    //将我们自己定义的handler添加进连接里面
    child.pipeline().addLast(childHandler);
    //设置连接参数
    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);
    try {
        //调用worker线程组来处理连接并添加监听器,在添加异常时关闭连接
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

所以经过ServerBootstrapAcceptor类之后,连接便被放入了worker线程组里面,然后经过pipeline的流转,最终会进入我们编写的handler里面进行处理。

Netty的流程分析到这儿就结束了。