简单的聊聊NIO的三种线程模型

978 阅读3分钟

这篇文章主要是接下来Netty源码阅读过程的预热,Netty的线程模型与这边文章提供的三种相关。Netty没有第二种,但是在主从Reactor多工作线程上又多了个多主线程模型。我的所有的NIO相关文章,都是为接下来Netty源码阅读系列的准备。

NIO对应的线程模型的设计模型

  • Reactor单线程模型
  • Reactor多工作线程模型
  • 主从Reactor多工作线程模型

Reactor单线程模型

/**
 * @Author CoderJiA
 * @Description NIOServer
 * @Date 13/2/19 下午4:59
 **/
public class NIOServer {

    public static void main(String[] args) throws Exception{

        // 1.创建ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(8899));

        // 2.创建Selector,并ServerSocketChannel注册OP_ACCEPT事件,接收连接。
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 3.开启轮询
        while (selector.select() > 0) {
            // 从selector所有事件就绪的key,并遍历处理。
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            selectionKeys.forEach(selectionKey -> {
                SocketChannel client;
                try {
                    if (selectionKey.isAcceptable()) {  // 接受事件就绪
                        // 获取serverSocketChannel
                        ServerSocketChannel server = (ServerSocketChannel)selectionKey.channel();
                        // 接收连接
                        client = server.accept();
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_READ);
                    } else if (selectionKey.isReadable()) {  // 读事件就绪
                        // 获取socketChannel
                        client = (SocketChannel) selectionKey.channel();
                        // 创建buffer,并将获取socketChannel中的数据读入到buffer中
                        ByteBuffer readBuf = ByteBuffer.allocate(1024);
                        int readCount = client.read(readBuf);
                        if (readCount <= 0) {
                            return;
                        }
                        Charset charset = Charset.forName(StandardCharsets.UTF_8.name());
                        readBuf.flip();
                        System.out.println(String.valueOf(charset.decode(readBuf).array()));
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                selectionKeys.remove(selectionKey);
            });
        }

    }

Reactor多工作线程模型

/**
 * @Author CoderJiA
 * @Description NIOServer
 **/
public class NIOServer {

    public static void main(String[] args) throws Exception{

        // 1.创建ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(8899));

        // 2.创建Selector,并ServerSocketChannel注册OP_ACCEPT事件,接收连接。
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 3.开启轮询
        while (selector.select() > 0) {
            // 从selector所有事件就绪的key,并遍历处理。
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            selectionKeys.forEach(selectionKey -> {
                SocketChannel client;
                try {
                    if (selectionKey.isAcceptable()) {  // 接受事件就绪
                        // 获取serverSocketChannel
                        ServerSocketChannel server = (ServerSocketChannel)selectionKey.channel();
                        // 接收连接
                        client = server.accept();
                        client.configureBlocking(false);
                        SelectionKey readKey = client.register(selector, SelectionKey.OP_READ);
                        readKey.attach(new Processor());
                    } else if (selectionKey.isReadable()) {  // 读事件就绪
                        Processor processor = (Processor) selectionKey.attachment();
                        processor.process(selectionKey);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                selectionKeys.remove(selectionKey);
            });
        }

    }

}
/**
 * @Author CoderJiA
 * @Description Processor
 **/
public class Processor {

    private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(16);

    public void process(SelectionKey selectionKey) {
        EXECUTOR_SERVICE.execute(() -> {
            try {
                // 获取socketChannel
                SocketChannel client = (SocketChannel) selectionKey.channel();
                // 创建buffer,并将获取socketChannel中的数据读入到buffer中
                ByteBuffer readBuf = ByteBuffer.allocate(1024);
                int readCount = client.read(readBuf);
                if (readCount <= 0) {
                    return;
                }
                Charset charset = Charset.forName(StandardCharsets.UTF_8.name());
                readBuf.flip();
                System.out.println(charset.decode(readBuf).array());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

}

主从Reactor多线程模型

/**
 * @Author CoderJiA
 * @Description NIOServer
 * @Date 13/2/19 下午4:59
 **/
public class NIOServer {

    public static void main(String[] args) throws Exception{

        // 1.创建ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(8899));

        // 2.创建Selector,并ServerSocketChannel注册OP_ACCEPT事件,接收连接。
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 3.创建processor
        int coreProcessorNum = Runtime.getRuntime().availableProcessors();
        Processor[] processors = new Processor[coreProcessorNum];
        IntStream.range(0, processors.length).forEach(i -> processors[i] = new Processor());

        // 4.开启轮询
        MutableInt index = new MutableInt(0);
        while (selector.select() > 0) {
            // 主Reactor监听连接
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            selectionKeys.forEach(selectionKey -> {
                try {
                    if (selectionKey.isAcceptable()) {  // 接受事件就绪
                        // 获取serverSocketChannel
                        ServerSocketChannel server = (ServerSocketChannel)selectionKey.channel();
                        // 接收连接
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        Processor processor = processors[(index.getValue()) % coreProcessorNum];
                        addVal(coreProcessorNum, index);
                        processor.addSocketChannel(client);
                        processor.wakeup();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                selectionKeys.remove(selectionKey);
            });
        }

    }

    private static void addVal(int coreProcessorNum, MutableInt index) {
        if (index.getValue() > coreProcessorNum * 100) {
            index.setValue(0);
        } else {
            index.increment();
        }
    }

}
/**
 * @Author CoderJiA
 * @Description Processor
 **/
public class Processor {

    private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

    private Selector selector;

    public Processor() {
        try {
            this.selector = Selector.open();
            start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void addSocketChannel(SocketChannel socketChannel) throws ClosedChannelException {
        System.out.println("processor addSocketChannel...");
        socketChannel.register(this.selector, SelectionKey.OP_READ);
    }

    public void wakeup() {
        System.out.println("processor wakeup...");
        this.selector.wakeup();
    }

    private void start() {
        System.out.println("processor start...");
        EXECUTOR_SERVICE.execute(this::run);
    }

    private void run() {
        System.out.println("processor run...");
        try {
            while (true) {
                if (selector.select(100) <= 0) {
                    continue;
                }
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                selectionKeys.forEach(selectionKey -> {
                    try {
                        if (selectionKey.isReadable()) {
                            // 获取socketChannel
                            SocketChannel client = (SocketChannel) selectionKey.channel();
                            // 创建buffer,并将获取socketChannel中的数据读入到buffer中
                            ByteBuffer readBuf = ByteBuffer.allocate(1024);
                            int readCount = client.read(readBuf);
                            if (readCount <= 0) {
                                return;
                            }
                            Charset charset = Charset.forName("UTF-8");
                            readBuf.flip();
                            System.out.println(charset.decode(readBuf).array());
                        }
                        selectionKeys.remove(selectionKey);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

参考文章地址:www.jasongj.com/java/nio_re…

代码地址: github.com/coderjia061…