Java编程方法论-Spring WebFlux篇 Reactor-Netty下HttpServer 的封装

5,418 阅读8分钟

前言

本系列为本人Java编程方法论 响应式解读系列的Webflux部分,现分享出来,前置知识Rxjava2 ,Reactor的相关解读已经录制分享视频,并发布在b站,地址如下:

Rxjava源码解读与分享:www.bilibili.com/video/av345…

Reactor源码解读与分享:www.bilibili.com/video/av353…

NIO源码解读相关视频分享: www.bilibili.com/video/av432…

NIO源码解读视频相关配套文章:

BIO到NIO源码的一些事儿之BIO

BIO到NIO源码的一些事儿之NIO 上

BIO到NIO源码的一些事儿之NIO 中

BIO到NIO源码的一些事儿之NIO 下 之 Selector

BIO到NIO源码的一些事儿之NIO 下 Buffer解读 上

BIO到NIO源码的一些事儿之NIO 下 Buffer解读 下

Java编程方法论-Spring WebFlux篇 01 为什么需要Spring WebFlux 上

Java编程方法论-Spring WebFlux篇 01 为什么需要Spring WebFlux 下

其中,Rxjava与Reactor作为本人书中内容将不对外开放,大家感兴趣可以花点时间来观看视频,本人对着两个库进行了全面彻底细致的解读,包括其中的设计理念和相关的方法论,也希望大家可以留言纠正我其中的错误。

HttpServer 的封装

本书主要针对Netty服务器来讲,所以读者应具备有关Netty的基本知识和应用技能。接下来,我们将对Reactor-netty从设计到实现的细节一一探究,让大家真的从中学习到好的封装设计理念。本书在写时所参考的最新版本是Reactor-netty 0.7.8.Release这个版本,但现在已有0.8版本,而且0.70.8版本在源码细节有不小的变动,这点给大家提醒下。我会针对0.8版本进行全新的解读。

HttpServer 的引入

我们由上一章可知Tomcat使用Connector来接收和响应连接请求,这里,对于Netty来讲,如果我们想让其做为一个web服务器,我们先来看一个Netty常见的一个用法(这里摘自官方文档一个例子DiscardServer Demo):

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 丢弃任何进入的数据
 */
public class DiscardServer {

    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            // 绑定端口,开始接收进来的连接
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 等待服务器  socket 关闭 。
            // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}
  1. NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器,Netty 提供了许多不同的 EventLoopGroup 的实现用来处理不同的传输。在这个例子中我们实现了一个服务端的应用,因此会有2个 NioEventLoopGroup 会被使用。第一个经常被叫做BossGroup,用来接收进来的连接。第二个经常被叫做WorkerGroup,用来处理已经被接收的连接,一旦BossGroup接收到连接,就会把连接信息注册到WorkerGroup上。如何知道多少个线程已经被使用,如何映射到已经创建的 Channel上都需要依赖于 EventLoopGroup 的实现,并且可以通过构造函数来配置他们的关系。
  2. ServerBootstrap 是一个启动 NIO 服务的辅助启动类。你可以在这个服务中直接使用 Channel,但是这会是一个复杂的处理过程,在很多情况下你并不需要这样做。
  3. 这里我们通过指定使用 NioServerSocketChannel来举例说明一个新的 Channel 如何接收传进来的连接。
  4. 这里的事件处理类经常会被用来处理一个最近已经接收的 ChannelChannelInitializer 是一个特殊的处理类,目的是帮助使用者配置一个新的 Channel。 使用其对应的ChannelPipeline 来加入你的服务逻辑处理(这里是DiscardServerHandler)。当你的程序变的复杂时,可能你会增加更多的处理类到 pipline 上,然后提取这些匿名类到最顶层的类上(匿名类即ChannelInitializer实例我们可以将其看成是一个代理模式的设计,类似于ReactorSubscriber的设计实现,一层又一层的包装,最后得到一个我们需要的一个可以层层处理的Subscriber)。
  5. 你可以设置这里指定的 Channel 实现的配置参数。如果我们写一个TCP/IP 的服务端,我们可以设置 socket 的参数选项,如tcpNoDelaykeepAlive。请参考 ChannelOptionChannelConfig实现的接口文档来对ChannelOption 的有一个大概的认识。
  6. 接着我们来看 option()childOption() :option() 是提供给NioServerSocketChannel用来接收进来的连接。childOption() 是提供给由父管道 ServerChannel 接收到的连接,在这个例子中也是 NioServerSocketChannel
  7. 剩下的就是绑定端口然后启动服务。这里我们在服务器上绑定了其 8080 端口。当然现在你可以多次调用 bind() 方法(基于不同绑定地址)。

针对bootstrap的option的封装

在看了常见的Netty的一个服务器创建用法之后,我们来看Reactor Netty给我们提供的Http服务器的一个封装:reactor.ipc.netty.http.server.HttpServer。由上面DiscardServer Demo可知,首先是定义一个服务器,方便设定一些条件对其进行配置,然后启动的话是调用其run方法启动,为做到更好的可配置性,这里使用了建造器模式,以便我们自定义或直接使用默认配置(有些是必须配置,否则会抛出异常,这也是我们这里面所设定的内容之一):

//reactor.ipc.netty.http.server.HttpServer.Builder
public static final class Builder {
    private String bindAddress = null;
    private int port = 8080;
    private Supplier<InetSocketAddress> listenAddress = () -> new InetSocketAddress(NetUtil.LOCALHOST, port);
    private Consumer<? super HttpServerOptions.Builder> options;

    private Builder() {
    }
    ...
    public final Builder port(int port) {
        this.port = port;
        return this;
    }

    /**
        * The options for the server, including bind address and port.
        *
        * @param options the options for the server, including bind address and port.
        * @return {@code this}
        */
    public final Builder options(Consumer<? super HttpServerOptions.Builder> options) {
        this.options = Objects.requireNonNull(options, "options");
        return this;
    }

    public HttpServer build() {
        return new HttpServer(this);
    }
}

可以看到,此处的HttpServer.Builder#options是一个函数式动作Consumer,其传入的参数是HttpServerOptions.Builder,在HttpServerOptions.Builder内可以针对我们在DiscardServer Demo中的bootstrap.option进行一系列的默认配置或者自行调控配置,我们的对于option的自定义设置主要还是针对于ServerBootstrap#childOption。因为在reactor.ipc.netty.options.ServerOptions.Builder#option这个方法中,有对它的父类reactor.ipc.netty.options.NettyOptions.Builder#option进行了相应的重写:

//reactor.ipc.netty.options.ServerOptions.Builder
public static class Builder<BUILDER extends Builder<BUILDER>>
	extends NettyOptions.Builder<ServerBootstrap, ServerOptions, BUILDER>{...}
	
//reactor.ipc.netty.options.ServerOptions.Builder#option
/**
* Set a {@link ChannelOption} value for low level connection settings like
* SO_TIMEOUT or SO_KEEPALIVE. This will apply to each new channel from remote
* peer.
*
* @param key the option key
* @param <T> the option type
* @return {@code this}
* @see ServerBootstrap#childOption(ChannelOption, Object)
*/
@Override
public final <T> BUILDER option(ChannelOption<T> key, T value) {
this.bootstrapTemplate.childOption(key, value);
return get();
}
//reactor.ipc.netty.options.NettyOptions.Builder#option
/**
* Set a {@link ChannelOption} value for low level connection settings like
* SO_TIMEOUT or SO_KEEPALIVE. This will apply to each new channel from remote
* peer.
*
* @param key the option key
* @param value the option value
* @param <T> the option type
* @return {@code this}
* @see Bootstrap#option(ChannelOption, Object)
*/
public <T> BUILDER option(ChannelOption<T> key, T value) {
this.bootstrapTemplate.option(key, value);
return get();
}

这是我们需要注意的地方。然后,我们再回到reactor.ipc.netty.http.server.HttpServer.Builder,从其build这个方法可知,其返回一个HttpServer实例,通过对所传入的HttpServer.Builder实例的options进行判断,接着,就是对bootstrap.group的判断,因为要使用构造器配置的话,首先得获取到ServerBootstrap,所以要先判断是否有可用EventLoopGroup,这个我们是可以自行设定的,这里设定一次,bossGroupworkerGroup可能都会调用这一个,这点要注意下(loopResources源码注释已经讲的很明确了):

//reactor.ipc.netty.http.server.HttpServer.Builder#build
public HttpServer build() {
    return new HttpServer(this);
}
//reactor.ipc.netty.http.server.HttpServer#HttpServer
private HttpServer(HttpServer.Builder builder) {
    HttpServerOptions.Builder serverOptionsBuilder = HttpServerOptions.builder();
    if (Objects.isNull(builder.options)) {
        if (Objects.isNull(builder.bindAddress)) {
            serverOptionsBuilder.listenAddress(builder.listenAddress.get());
        }
        else {
            serverOptionsBuilder.host(builder.bindAddress).port(builder.port);
        }
    }
    else {
        builder.options.accept(serverOptionsBuilder);
    }
    if (!serverOptionsBuilder.isLoopAvailable()) {
        serverOptionsBuilder.loopResources(HttpResources.get());
    }
    this.options = serverOptionsBuilder.build();
    this.server = new TcpBridgeServer(this.options);
}
//reactor.ipc.netty.options.NettyOptions.Builder
public static abstract class Builder<BOOTSTRAP extends AbstractBootstrap<BOOTSTRAP, ?>,
SO extends NettyOptions<BOOTSTRAP, SO>, BUILDER extends Builder<BOOTSTRAP, SO, BUILDER>>
implements Supplier<BUILDER> {
    ...
/**
* Provide a shared {@link EventLoopGroup} each Connector handler.
*
* @param eventLoopGroup an eventLoopGroup to share
* @return {@code this}
*/
public final BUILDER eventLoopGroup(EventLoopGroup eventLoopGroup) {
Objects.requireNonNull(eventLoopGroup, "eventLoopGroup");
return loopResources(preferNative -> eventLoopGroup);
}
/**
* Provide an {@link EventLoopGroup} supplier.
* Note that server might call it twice for both their selection and io loops.
*
* @param channelResources a selector accepting native runtime expectation and
* returning an eventLoopGroup
* @return {@code this}
*/
public final BUILDER loopResources(LoopResources channelResources) {
this.loopResources = Objects.requireNonNull(channelResources, "loopResources");
return get();
}

public final boolean isLoopAvailable() {
return this.loopResources != null;
}
...
}

可以看到,这个类是Supplier实现,其是一个对象提取器,即属于一个函数式动作对象,适合用于懒加载的场景。这里的LoopResources也是一个函数式接口(@FunctionalInterface),其设计的初衷就是为io.netty.channel.Channel的工厂方法服务的:

//reactor.ipc.netty.resources.LoopResources
@FunctionalInterface
public interface LoopResources extends Disposable {

/**
* Default worker thread count, fallback to available processor
*/
int DEFAULT_IO_WORKER_COUNT = Integer.parseInt(System.getProperty(
    "reactor.ipc.netty.workerCount",
    "" + Math.max(Runtime.getRuntime()
                .availableProcessors(), 4)));
/**
* Default selector thread count, fallback to -1 (no selector thread)
*/
int DEFAULT_IO_SELECT_COUNT = Integer.parseInt(System.getProperty(
    "reactor.ipc.netty.selectCount",
    "" + -1));
/**
* Create a simple {@link LoopResources} to provide automatically for {@link
* EventLoopGroup} and {@link Channel} factories
*
* @param prefix the event loop thread name prefix
*
* @return a new {@link LoopResources} to provide automatically for {@link
* EventLoopGroup} and {@link Channel} factories
*/
static LoopResources create(String prefix) {
return new DefaultLoopResources(prefix, DEFAULT_IO_SELECT_COUNT,
        DEFAULT_IO_WORKER_COUNT,
        true);
}
static LoopResources create(String prefix,
			int selectCount,
			int workerCount,
			boolean daemon) {
		...
		return new DefaultLoopResources(prefix, selectCount, workerCount, daemon);
	}
...
/**
* Callback for server {@link EventLoopGroup} creation.
*
* @param useNative should use native group if current {@link #preferNative()} is also
* true
*
* @return a new {@link EventLoopGroup}
*/
EventLoopGroup onServer(boolean useNative);
...
}

我们在自定义的时候,可以借助此类的静态方法create方法来快速创建一个LoopResources实例。另外通过LoopResources的函数式特性,可以做到懒加载(将我们想要实现的业务藏到一个方法内),即,只有在使用的时候才会生成所需要的对象实例,即在使用reactor.ipc.netty.options.NettyOptions.Builder#loopResources(LoopResources channelResources)方法时,可进行loopResources(true -> new NioEventLoopGroup()),即在拿到LoopResources实例后,只有调用其onServer方法,才能拿到EventLoopGroup。这样就可以大大节省内存资源,提高性能。

小结

至此,我们将由netty的普通使用到HttpServer的封装完成通过本章给大家展示出来,目的也是告诉大家这个东西是怎么来的,基于什么样的目的,接下来,我们会依照这个思路一步步给大家揭开Reactor-netty的面纱以及其与Spring webflux是如何对接设计的。