如何构建可伸缩的高性能网络服务(Reactor编程模式详解)

552 阅读7分钟

Reactor网络编程模型是java.util.concurrent包的作者Dung Lea在《Scalable IO in Java》中提出的高性能的网络编程服务架构模式,Netty,Mina等高性能的NIO服务框架都采用的是Reactor模式。

1. 网络服务

网络服务以不同的形式存在,如web服务、分布式服务等,但是它们大多具备相同的处理流程。

  1. 接收请求,从网络I/O中读取字节流数据。
  2. 对请求进行解码,将字节流转为约定的对象。
  3. 处理业务逻辑。
  4. 对响应进行编码,将经过业务逻辑处理后转为字节流。
  5. 返回响应,发送响应到网络I/O。

2. 传统的服务设计模式

在网络服务中为每一个连接的处理开启一个新的线程,如下图:

看我如何把NIO拉下神坛中已经分析过这种模式严重依赖线程,系统伸缩性差,无法应对海量的请求。

3. 高性能可伸缩的服务设计模式

在构建高性能可伸缩的网络服务的过程中,我们希望达到以下的目标:

  1. 在海量请求高负载的情况下能够优雅降级。

  2. 硬件资源的升级能够持续的为系统带来性能提升(cpu,内存,磁盘,带宽)。

  3. 还要满足可用性和性能目标

    • 低延时
    • 高负载
    • 可调节的服务质量

分治法通常是最好的实现任何可伸缩性目标的方法。

4. 分治法(Divide and Conquer)

  1. 将完整的处理过程分解为若干个小任务。
  2. 每个小任务执行一个动作,并且不产生阻塞。
  3. 在任务准备好的时候去执行它。这里,一个I/O事件通常被作为触发器。

java.nio提供了实现分治法的基本机制

  • 非阻塞的读写。
  • 基于I/O事件分发相关的任务(读就绪,写就绪,连接事件)。

基于事件驱动的设计模式,为高性能网络服务架构带来丰富的可扩展性。

5. 基于事件驱动的设计模式(Event-driven Designs)

基于事件驱动的设计模式通常比其他的选择更加有效。

  • 占用的资源更少:不需要为每个客户端开启一个新的线程。
  • 开销小:更少的线程上下文切换,更少的锁。

但是任务调度相对要慢一些,并且更难编程。因为必须手动将操作绑定到事件,相关的功能必须拆封成简单的非阻塞操作,类似于GUI事件驱动机制,当然不可能消除所有的阻塞,例如GC,页中断(page faults)等。必追踪服务的逻辑状态(因为是事件驱动的,所以需要根据状态判断执行的动作)。

5.1 AWT中的事件驱动设计

6. Reactor模式

  • Reactor模式通过分配适当的处理器(Handler)来响应IO事件,类似于AWT的线程。
  • 每个Handler执行非阻塞操作,类似于动作监听(ActionListeners)。
  • 通过将Handler绑定到事件进行管理,类似于AWT的添加动作监听(addActionListeners)。

Reactor模式核心角色

  • Reactor:将IO事件分发给Acceptor或者对应的Handler。
  • Acceptor:处理新的客户端连接,创建连接对应的Handler。
  • Handler:执行非阻塞的读写任务。

6.1 单线程模式

  1. Reator线程初始化
@Slf4j
public class ServerReactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocketChannel;
    @Setter
    private volatile boolean stop = false;

    // ************ Reactor 1: Setup ***********************
    
    public ServerReactor(int port, int backlog) throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(port), backlog);
        serverSocket.setReuseAddress(true);
        serverSocketChannel.configureBlocking(false);
        // 将channel注册到多路复用器上,并监听ACCEPT事件
        SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        log.info("ServerSocket-Key: [{}]",selectionKey);
        // 添加Acceptor处理新连接
        selectionKey.attach(new Acceptor(selector, selectionKey, serverSocketChannel));
    }

    @Override
    public void run() {

        // ************ Reactor 2: Dispatch Loop ***********************

        try {
            // 无限的接收客户端连接
            while (!stop && !Thread.interrupted()) {
                int num = selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    // 移除key,否则会导致事件重复消费
                    it.remove();
                    KeyUtil.keyOps(key);
                    try {
                        dispatch(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            }
        } catch (IOException e) {
            log.error("{}", e);
        }
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                log.error("{}", e);
            }

        }
    }

    private void dispatch(SelectionKey key) {
        // 如果是连接事件获取是Acceptor
        // 如果是是IO读写事件获取是对应的Handler
        Runnable runnable = (Runnable) key.attachment();
        runnable.run();
    }
}

  1. 第二步:Acceptor创建
@Slf4j
public class Acceptor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocketChannel;
    final SelectionKey selectionKey;

    // ************ Reactor 3: Acceptor ***********************

    public Acceptor(Selector selector, SelectionKey selectionKey, ServerSocketChannel serverSocketChannel) {
        this.selector = selector;
        this.selectionKey = selectionKey;
        this.serverSocketChannel = serverSocketChannel;
    }

    @Override
    public void run() {
        try {
            if (selectionKey.isValid() && selectionKey.isAcceptable()) {
                SocketChannel socketChannel = serverSocketChannel.accept();
                log.info("channel [{}->{}] establish", socketChannel.getRemoteAddress(), socketChannel.getLocalAddress());
                // 创建对应的Handler
                new BasicHandler(selector, socketChannel);
            }

        } catch (IOException e) {
            log.error("{}", e);
        }
    }
}
  1. 第三步:Handler创建
@Slf4j
public class BasicHandler implements Runnable {
    final SelectionKey selectionKey;
    final SocketChannel socketChannel;

    // ************ Reactor 4: Handler ***********************

    public BasicHandler(Selector selector, SocketChannel socketChannel) throws IOException {
        this.socketChannel = socketChannel;
        this.socketChannel.configureBlocking(false);
        selectionKey = this.socketChannel.register(selector, 0);
        // attach替换
        selectionKey.attach(this);
        selectionKey.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
        log.info("Socket-Key: [{}]", selectionKey);

    }

    @Override
    public void run() {
        try {
            if (selectionKey.isReadable()) {
                doRead();
            } else if (selectionKey.isWritable()) {
                doWrite();

            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void doWrite() {
        Scanner scanner = new Scanner(System.in);
        new Thread(() -> {
            while (scanner.hasNext()) {
                try {

                    ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                    writeBuffer.put(scanner.nextLine().getBytes());
                    writeBuffer.flip();
                    socketChannel.write(writeBuffer);
                } catch (Exception e) {

                }
            }
        }).start();
        selectionKey.interestOps(SelectionKey.OP_READ);
    }

    protected void doRead() throws IOException {
        // setup1: ****一次性读取数据***
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        int readBytes = socketChannel.read(readBuffer);
        if (readBytes > 0) {
            // 业务处理
            process(readBuffer);
            selectionKey.interestOps(SelectionKey.OP_WRITE);
        } else if (readBytes < 0) {
            selectionKey.cancel();
            socketChannel.close();
        }

    }

    protected void process(ByteBuffer readBuffer) {

        // setup2: ****解码***

        // setup3: ****处理数据***
        readBuffer.flip();
        byte[] bytes = new byte[readBuffer.remaining()];
        readBuffer.get(bytes);
        log.info("recv client content: " + new String(bytes));
        try {
            TimeUnit.SECONDS.sleep(10);
            log.info("业务处理完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // setup4: ****编码***
        
    }

}

该模型适用于Handler能快速处理业务逻辑的场景。串行化处理可以最大限度减少锁的竞争,但是不能充分利用CPU多核的资源。

6.2 单Reactor多Work线程模型

为了充分利用多核的优势,我们可以采用多线程模式用于处理非IO操作来获得更高的伸缩性,一般来说业务处理(process方法)会有耗时比较长的可能,而业务处理阻塞会影响Reacotr的性能,所以我们可以把业务处理的**非IO操作(编/解码,业务逻辑)**交给Work线程池来处理。

  1. 卸载非IO操作来提升Reactor线程的处理性能,类似于POSA2中Proactor的设计。
  2. 比将非IO操作重新设计为事件驱动的方式更简单。
  3. 很难与IO重叠处理,最好能先将所有输入读入缓冲区。
  4. 使用线程池对线程资源进行调优与控制,通常情况下需要的线程数量比客户端数量少很多。

@Slf4j
public class MultiThreadHandler extends BasicHandler {
    static final int threadPoolSize = Runtime.getRuntime().availableProcessors() + 1;
    static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            threadPoolSize,
            threadPoolSize,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<Runnable>(1000));

    public MultiThreadHandler(Selector selector, SocketChannel socketChannel) throws IOException {
        super(selector, socketChannel);
    }


    @Override
    protected void doRead() throws IOException {
        // setup1: ****一次性读取数据***
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        int readBytes = socketChannel.read(readBuffer);
        if (readBytes > 0) {
            // 线程池进行业务处理
            executor.execute(new Processer(readBuffer));
            selectionKey.interestOps(SelectionKey.OP_WRITE);
        } else if (readBytes < 0) {
            selectionKey.cancel();
            socketChannel.close();
        }
    }

    class Processer implements Runnable {
        final ByteBuffer buffer;

        Processer(ByteBuffer buffer) {
            this.buffer = buffer;
        }

        @Override
        public void run() {
            process(buffer);
        }
    }

}

6.3 多Reactor多Worker线程模型

为了进一步协调CPU和IO读写效率,提升系统的资源利用率。可以将Reactor拆封为两个部分,MainReactor负责监听socket,用于处理accept事件,将建立的连接分发给SubReactor,SubReactor由线程池执行,负责处理IO读写事件。Netty采用的就是这种模式。

@Slf4j
public class MainReactor implements Runnable {
    final Selector selector;

    final ServerSocketChannel serverSocketChannel;

    @Setter
    private volatile boolean stop = false;

    public MainReactor(int port, int backlog) throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(port), backlog);
        serverSocket.setReuseAddress(true);
        serverSocketChannel.configureBlocking(false);
        // 将channel注册到多路复用器上,并监听ACCEPT事件
        SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        log.info("ServerSocket-Key: [{}]", selectionKey);
        // 添加Acceptor处理新连接
        selectionKey.attach(new MultiAcceptor(selectionKey, serverSocketChannel));
    }


    @Override
    public void run() {
        try {
            while (!stop && !Thread.interrupted()) {
                int num = selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    // 移除key,否则会导致事件重复消费
                    it.remove();
                    KeyUtil.keyOps(key);
                    try {
                        dispatch(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            }
        } catch (IOException e) {
            log.error("{}", e);
        }
    }

    private void dispatch(SelectionKey key) {
        Runnable runnable = (Runnable) key.attachment();
        runnable.run();
    }
}
@Slf4j
public class MultiAcceptor implements Runnable {
    static final int subReactorSize = Runtime.getRuntime().availableProcessors();
    static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            subReactorSize,
            subReactorSize,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<Runnable>(1000), new NameThreadFactory("subReactor")
    );
    private Selector[] selectors = new Selector[subReactorSize]; // 多路复用器个数和SubReactor线程个数相同
    private ServerSocketChannel serverSocketChannel;
    final SelectionKey selectionKey;

    private volatile int next = 0;

    public MultiAcceptor(SelectionKey selectionKey, ServerSocketChannel serverSocketChannel) throws IOException {
        this.selectionKey = selectionKey;
        this.serverSocketChannel = serverSocketChannel;
        init();
    }

    public void init() throws IOException {
        for (int i = 0; i < subReactorSize; i++) {
            selectors[i] = Selector.open();
        }
    }

    @Override
    public synchronized void run() {
        try {
            if (selectionKey.isValid() && selectionKey.isAcceptable()) {

                SocketChannel socketChannel = serverSocketChannel.accept();
                log.info("channel [{}->{}] establish", socketChannel.getRemoteAddress(), socketChannel.getLocalAddress());

                if (socketChannel != null) {
                    executor.execute(new SubReactor(selectors[next], socketChannel));
                }
                if (++next == selectors.length)
                    next = 0;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}
@Slf4j
public class SubReactor implements Runnable {

    private Selector selector;

    public SubReactor(Selector selector, SocketChannel socketChannel) throws IOException {
        this.selector = selector;
        new MultiThreadHandler(selector, socketChannel);
    }

    @Setter
    private boolean stop = false;

    @Override
    public void run() {
        // ************ Reactor 2: Dispatch Loop ***********************

        try {
            while (!stop && !Thread.interrupted()) {
                int num = selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    // 移除key,否则会导致事件重复消费
                    it.remove();
                    KeyUtil.keyOps(key);
                    try {
                        dispatch(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            }
        } catch (IOException e) {
            log.error("{}", e);
        }
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                log.error("{}", e);
            }

        }
    }

    private void dispatch(SelectionKey key) {
        Runnable runnable = (Runnable) key.attachment();
        runnable.run();
    }
}

拆分后的Reactor模型,扩展性更强。不同的Reactor职责更加明确,一般来说一个MainReactor线程和多个SubReactor线程就能处理百万级别的客户端连接。

参考文献:

  1. gee.cs.oswego.edu/dl/cpjslide…