浅谈Netty中的ChannelPipeline

1,931 阅读5分钟

Netty中另外两个重要组件—— ChannelHandle,ChannelHandleContext,Pipeline。Netty中I/O事件的传播机制以及数据的过滤和写出均由它们负责。

pipeline的初始化

步骤

  • pipeline在创建Channel的时候被创建
    • AbstractChannel中 pipeline = newChannelPipeline()-DefaultChannelPipeline
  • pipeline节点数据结构:ChannelHandlerContext的双向链表
  • pipeline中的两大哨兵:head和tail

分析

  • pipeline在创建Channel的时候被创建
    • 在服务端Channel和客户端Channel创建的时候,调用父类AbstractChannel初始化时候会对pipeline进行初始化。
    // AbstractChannel(..)
    protected AbstractChannel(Channel parent) {
        ...
        // 创建pipeline
        pipeline = newChannelPipeline();
    }
    // newChannelPipeline()
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
    // DefaultChannelPipeline(..)
    protected DefaultChannelPipeline(Channel channel) {
        ...
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }
  • 看下HeadContext和TailContext构造方法
    final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
        ...
        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
        ...    
    }
    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, true, false);
            ...
    }
    abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
      ...
      AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {
          this.name = ObjectUtil.checkNotNull(name, "name");
          this.pipeline = pipeline;
          this.executor = executor;
          this.inbound = inbound;
          this.outbound = outbound;
          ...
      }

head与tail它们都会调用父类AbstractChannelHandlerContext构造器去完成初始化,由此我们可以预见ChanelPipeline里面存放的是一个个ChannelHandlerContext,根据DefaultChannelPipeline构造方法我们可以知道它们数据结构为双向链表,根据AbstractChannelHandlerContext构造方法,我们可以发现head指定的为出栈处理,而tail指定的为入栈处理器。

  • pipeline中的两大哨兵:head和tail

pipeline里面的事件传播机制我们接下来验证,但是我们可以推测出入栈从head开始传播,因为它是出栈处理器,所以它只管往下传播不做任何处理,一直到tail会结束。出栈从tail开始传播,因为他是入栈处理器,所以它只管往下传播事件即可,也不做任何处理。这么看来对于入栈,从head开始到tail结束;对于出栈恰恰相反,从tail开始到head结束。

添加删除ChannelHandler

步骤

  • 添加ChannelHandler流程
    • 判断是否重复添加
      • filterName(..)
    • 创建节点并添加至链表
    • 回调添加完成事件
      • callHandlerAdded0(..)
  • 删除ChannelHandler流程
    • 找到节点
    • 链表的删除
    • 回调删除handler事件

分析

  • 判断是否重复添加
    // filterName(..)
    private String filterName(String name, ChannelHandler handler) {
        if (name == null) {
            return generateName(handler);
        }
        checkDuplicateName(name);
        return name;
    }
    // 判断重名
    private void checkDuplicateName(String name) {
        if (context0(name) != null) {
            throw new IllegalArgumentException("Duplicate handler name: " + name);
        }
    }
    // 找有没有同名的context
    private AbstractChannelHandlerContext context0(String name) {
        AbstractChannelHandlerContext context = head.next;
        while (context != tail) {
            if (context.name().equals(name)) {
                return context;
            }
            context = context.next;
        }
        return null;
    }
  • 创建节点并添加至链表
    // 插入到链表中tail节点的前面。
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
  • 顺着callHandlerAdded0(..)方法一直跟到AbstractChannelHandlerContext的callHandlerAdded(..)
    final void callHandlerAdded() throws Exception {
        ...
        if (setAddComplete()) {
            // 调用具体handler的handlerAdded方法
            handler().handlerAdded(this);
        }
    }
  • 删除ChannelHandler流程
  • 找到节点
    private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
        AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
        if (ctx == null) {
            throw new NoSuchElementException(handler.getClass().getName());
        } else {
            return ctx;
        }
    }
    // 相同堆内地址即为找到
    public final ChannelHandlerContext context(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        AbstractChannelHandlerContext ctx = head.next;
        for (;;) {

            if (ctx == null) {
                return null;
            }

            if (ctx.handler() == handler) {
                return ctx;
            }

            ctx = ctx.next;
        }
    }
  • 链表的删除
    private static void remove0(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next = ctx.next;
        prev.next = next;
        next.prev = prev;
    }
  • 回调handler remove方法
    final void callHandlerRemoved() throws Exception {
        try {
            // Only call handlerRemoved(...) if we called handlerAdded(...) before.
            if (handlerState == ADD_COMPLETE) {
                handler().handlerRemoved(this);
            }
        } finally {
            // Mark the handler as removed in any case.
            setRemoved();
        }
    }

事件和异常的传播

步骤

  • inBound事件的传播
    • ChannelRead事件的传播
    • SimpleInBoundHandler处理器
  • outBound事件的传播
    • write事件的传播
  • 异常的传播
    • 异常触发链

分析

  • inBound事件的传播分析
    // 省略代码
    ... 
    serverBootstrap
    ...
    .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new Inbound1())
                                .addLast(new InBound2())
                                .addLast(new Inbound3());
                    }
    });
    ...
    public class Inbound1 extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("at InBound1: " + msg);
            ctx.fireChannelRead(msg);
        }
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.channel().pipeline().fireChannelRead("hello cj");
        }
    }
    public class Inbound2 extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("at InBound2: " + msg);
            ctx.fireChannelRead(msg);
        }

    }
    public class Inbound3 extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("at InBound3: " + msg);
            ctx.fireChannelRead(msg);
        }
    }
  • 我们来画一下现在客户端接入做入栈处理时,客户端Channel的pipeline中的情况:

从head开始一直向下一个inboud传播直到tail结束,也可以看到ChannelHandlerContext起到的正是中间纽带的作用, 它能拿到handle也可以向上获取到channel与pipeline,一个channel只会有一个pipeline,一个pipeline可以有多个入栈handler和出栈handler,而且每个handler都会被ChannelHandlerContext包裹着。事件传播依赖的ChannelHandlerContext的fire*方法。

  • 我们来运行下代码验证下: telnet创建一个客户端连接
  • 控制台打印

按照我们上边说的那样 InBoud1 -> InBound2 -> InBoud3

  • outBound事件的传播分析
public class Outbound1 extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        System.out.println("oubound1 write:" + msg);
        ctx.write(msg, promise);
    }

}

public class Outbound2 extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        System.out.println("oubound2 write:" + msg);
        ctx.write(msg, promise);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        ctx.executor().schedule(()-> {
            ctx.channel().pipeline().write("hello cj...");
        }, 5, TimeUnit.SECONDS);
    }

}

public class Outbound3 extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        System.out.println("oubound3 write:" + msg);
        ctx.write(msg, promise);
    }

}
  • 我们来画一下现在向客户端写出时间做出栈处理时,客户端Channel的pipeline中的情况:

与入栈事件传递顺序是完全相反的,也就是从链表尾部开始。

  • 我们验证下结果
  • 异常的传播
public class Inbound1 extends ChannelInboundHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Inbound1...");
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        throw new RuntimeException("cj test throw caught...");
    }
}
public class Inbound3 extends ChannelInboundHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Inbound2...");
        super.exceptionCaught(ctx, cause);
    }

}
public class Outbound1 extends ChannelOutboundHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Outbound1...");
        super.exceptionCaught(ctx, cause);
    }

}
public class Outbound2 extends ChannelOutboundHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Outbound2...");
        super.exceptionCaught(ctx, cause);
    }

}
public class Outbound3 extends ChannelOutboundHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Outbound3...");
        super.exceptionCaught(ctx, cause);
    }

}

异常的传播过程是从head一直遍历到tail结束,并在tail中将其打印出来。

  • 直接验证下结果
  • 对于ctx.write()和ctx.pipeline().write()有和不同
  ctx.write("hello cj...");
  ctx.pipeline().write("hello cj...");

ctx.write(..) 我们按照上面的内容是可以想到的,ctx.write其实是直接激活当前节点的下一个节点write,所以它不会从尾部开始向前遍历所有的outbound,而ctx.pipeline().write(..)我们看源码可以知道,它先调用pipeline的write方法,跟踪源码(下图)可以发现,他是从tail开始遍历的,所有的outboud会依次被执行。同理inbound也是如此

源码地址

github.com/coderjia061…