在上一篇介绍Netty服务端接收连接的时候,我们分析到连接接收了后,便被放入pipeline里面去执行读操作。代码如下:
//触发读事件,将该channel分配给worker线程处理
pipeline.fireChannelRead(readBuf.get(i));
所以要分析Netty如何处理连接,必须先要分析一下pipeline的工作机制。
1.Pipeline工作机制
pipeline的大概作用我们之前已经分析过了,一句话概括就是流水线模式。查看源码我们可以发现NioServerSocketChannel和NioSocketChannel自带的pipeline实现都是DefaultChannelPipeline,所以我们也从这个类开始分析;
首先查看构造方法:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
...
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
DefaultChannelPipeline构造需要一个Channel,pipeline和channel是互相持有的,通过channel可以获取到pipeline,通过pipeline也可以获取到channel。
其次既然是pipeline,自然就要有头部节点和尾部节点,正是head和tail。
接着我们看看pipeline是如何添加节点的,查看addLast方法(其他方法也是可以的,逻辑都相似)。
public final ChannelPipeline addLast(ChannelHandler handler) {
return addLast(null, handler);
}
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
//给ChannelHandler一层封装AbstractChannelHandlerContext
newCtx = newContext(group, filterName(name, handler), handler);
//添加到队列末尾
addLast0(newCtx);
...
}
callHandlerAdded0(newCtx);
return this;
}
查看一下newContext方法
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
//handle至少简单的持有一下,继续查看super里面的方法
super(pipeline, executor, name, handler.getClass());
this.handler = handler;
}
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
//这个地方要留一下心。
this.executionMask = mask(handlerClass);
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
mask方法大家可以进去看一下,这个方法主要作用就是计算出handler的类型,因为netty有读操作,有写操作,所以根据mask方法计算出一个值,来标记一下这个handle的操作类型。
newContext方法到这边就结束了。接下来查看一下addLast0方法:
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
这个逻辑一眼就可以明白了,添加到末尾只是添加到尾节点之前,因为tail节点永远是最后一个节点,同理添加到头节点也是添加到头节点之后,head节点必须在第一个。
这样添加节点的逻辑我们也整明白了。下面就开始正式分析pipeline的工作流程。
查看fireChannelRead方法:
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
//调用了一个静态的invokeChannelRead方法,我们进入该方法
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
//touch方法暂时不用太在意,后面的判断则是决定是在当前线程执行任务还是放入线程池执行。
//关键的方法在于next.invokeChannelRead(m)
//此处传入的next是head节点,所以我们进入head节点查看invokeChannelRead方法
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
//因为读方法要实现ChannelInboundHandler,所以此处先转换一下
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
//HeadContext的handler返回的是this
//所以进入HeadContext的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);
}
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
//到这儿我们就发现了,因为需要从头节点往后搜寻需要执行读操作的节点
//所以Head节点必须是第一个节点,而之前mask方法计算出得值,也是用在了这个地方
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
//找到需要执行的读节点之后,便执行invokeChannelRead方法
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
//大家应该发现了,这个方法前面已经遇到过,所以直接进入next.invokeChannelRead(m)
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
//我们又到这儿了,还是先转换成ChannelInboundHandler,在执行handler方法。
//然而不同的是此时的handler()取出的已经是我们自己定义的handler了
到这儿我们就已经找到了自定义handler的执行路径。如果定义了多个handler,而且想继续往下执行的话,我们只需在自己定义的handler的channelRead方法里再次调用ctx.fireChannelRead(msg)方法即可。这是不是很像tomcat的Filter?
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
2.连接是怎么传递给worker的
熟悉了pipeline的工作机制后,我们来分析一下连接是怎么传递给worker线程组的。
经过上一篇的分析:
void init(Channel channel) {
//...
ChannelPipeline p = channel.pipeline();
//...
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
//注意这个地方的handel不是我们设置的childHandle
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
我们知道在ServerBootstrap的init方法中,boss线程组的pipeline里面添加了一个ServerBootstrapAcceptor,而NioServerSocketChannel接收到的线程直接执行了channelRead方法,所以我们查看一下这个类的channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//将我们自己定义的handler添加进连接里面
child.pipeline().addLast(childHandler);
//设置连接参数
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
//调用worker线程组来处理连接并添加监听器,在添加异常时关闭连接
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
所以经过ServerBootstrapAcceptor类之后,连接便被放入了worker线程组里面,然后经过pipeline的流转,最终会进入我们编写的handler里面进行处理。
Netty的流程分析到这儿就结束了。