NIO最佳实践

2,372 阅读5分钟

SelectionKey.OP_WRITE订阅时机

现象: cpu占用超高

原因: 订阅了SelectionKey.OP_WRITE事件

Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    if (selectionKey.isConnectable()) {
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        if (socketChannel.isConnectionPending()) {
                            socketChannel.finishConnect();
                        }
                        socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                    }

分析: 当socket缓冲区可写入时就会触发OP_WRITE事件. 而socket缓冲区大多时间都可写入(网络不拥堵),由于nio水平触发的特性OP_WRITE会一直触发导致while()一直空转

水平触发: 简单解释为只要满足条件就一直触发,而不是发生状态改变时才触发(有点主动和被动触发的感觉)

最佳实践:

方案一: 当有写数据需求时订阅OP_WRITE事件,数据发送完成取消订阅.

	while (channel.isOpen()) {
            if (channel.isConnected() && writeBuffer.isReadable()) {
                //writeBuffer可读 注册write事件
                channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            }
      		//当采用临时订阅OP_WRITE方式 必须使用select(ms)进行超时返回
            // 因为很有可能当select()前极短时间内writeBuffer有数据,而此时没有订阅OP_WRITE事件,会使select()一直阻塞
      		int ready = selector.select(300);
            if (ready > 0) {
              	SelectionKey selectionKey = iterator.next();
                iterator.remove();
                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                socketChannel.configureBlocking(false);
                if (selectionKey.isWritable()) {
                  	 writeBuffer.flip();
                      while (writeBuffer.hasRemaining()) {
                          channel.write(writeBuffer);
                      }
                      writeBuffer.clear();
                  	 socketChannel.register(selector, SelectionKey.OP_READ);
                }
            }
	}

当使用临时订阅OP_WRITE事件方式时,必须使用selector.select(long),进行超时返回. 因为很有可能当select()前极短时间内writeBuffer有数据,而此时没有订阅OP_WRITE事件,会使select()一直阻塞

方案二: 不订阅OP_WRITE事件,直接通过socketChannel.write()写数据.

 	    Selector selector = Selector.open();
        channel.register(selector, SelectionKey.OP_CONNECT);
        channel.connect(new InetSocketAddress("localhost", 5555));
        while (channel.isOpen()) {
            if (channel.isConnected()) {
                writeBuffer.flip();
                while (writeBuffer.hasRemaining()) {
                    channel.write(writeBuffer);
                }
                writeBuffer.clear();
            }
            int ready = selector.select(500);
            ...各种事件处理
        }

方案三: 一直订阅OP_WRITE,socketChannel主动写

    while (channel.isOpen()) {
       //这里与方案一有区别 可以直接阻塞
        int ready = selector.select();
        if (ready > 0) {
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                ...缓冲区已写数据清理

                SelectionKey selectionKey = iterator.next();
                iterator.remove();
                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                socketChannel.configureBlocking(false);
                if (selectionKey.isConnectable()) {
                    if (socketChannel.isConnectionPending()) {
                        socketChannel.finishConnect();
                    }
                   //订阅读/写事件
                    socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                }
                if (selectionKey.isReadable()) {
                ...读事件处理
                }
                if (selectionKey.isWritable()) {
                    //改为主动读取式
                    ByteBuffer byteBuffer = awaitGetWrite(writeBuffer, 30, 50);
                    if (byteBuffer != null) {
                        int write = channel.write(byteBuffer);
                        writeBuffer.readerIndex(writeBuffer.readerIndex() + write);
                        if (write != byteBuffer.limit()) {
                            System.out.print("a");
                        }
                    }
                }
            }
        }
    }

    /**
     * 等待获取写缓存
     * @param byteBuf 缓冲区
     * @param ms 缓冲时间 防止空转
     * @param cap 阈值:超过则直接返回,没超过等待ms后判断是否超过阈值
     * @return
     */
    public ByteBuffer awaitGetWrite(ByteBuf byteBuf, long ms, int cap) {
       //缓冲大小 不要过大就行 自己调整
        int socketCap = 1024 * 30;
        if (byteBuf.readableBytes() >= cap) {//>=cap直接返回
            return ByteBuffer.allocate(byteBuf.readableBytes() > socketCap ? socketCap : byteBuf.readableBytes());
        } else {//<cap时等待
            CountDownLatch countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await(ms, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (byteBuf.readableBytes() > 0) {
                return ByteBuffer.allocate(byteBuf.readableBytes() > socketCap ? socketCap : byteBuf.readableBytes());
            } else {
                return null;
            }
        }
    }
优点 缺点
方案1 当网络拥堵时,不尝试写数据 需要自己控制订阅/取消订阅的时机
方案2 不关心网络拥堵,只要有数据就尝试写,当网络拥堵时做大量无用功 编写方便,无需关心OP_WRITE事件订阅时机
方案3 相比方案1 编码复杂度下降

综合上述个人觉得还是方案3比较好

channel.write()写数据问题

现象: 网络拥堵时,cpu占用超高

原因: 网络拥堵时, channel.write()一直写不进去,导致while()空转

采取上一问题方案3可以避免该问题

			   writeBuffer.flip();
                while (writeBuffer.hasRemaining()) {
                    channel.write(writeBuffer);
                }
                writeBuffer.clear();

分析: 当网络拥堵时,channel.write()可能写入0数据,而这里采用死循环写入数据,假如一直写不进去就会导致空转

最佳实践:

 	while (writeBuffer.isReadable()) {
 	   //这里使用的是netty的ByteBuf
        ByteBuffer byteBuffer = writeBuffer.nioBuffer();
        channel.write(byteBuffer);
        writeBuffer.readerIndex(writeBuffer.readerIndex() + byteBuffer.position());
        int left = byteBuffer.limit() - byteBuffer.position();
        if (left != 0) {//无法全部写入到socket缓冲区中,说明socket缓冲区已满,可能发生空转 break
            System.err.print("a");
            //防止空转 依赖外层循环重新进入 
            break;
        } 
    }
                   

结合OP_WRITE订阅时机问题,可以得知方案一的临时订阅OP_WRITE事件方式,能更好的防止channel.write(byteBuffer)空转

TCP断开判断

现象: 当TCP一方断开时,另一方cpu占用超高

原因: 当TCP一方断开时,一直会触发OP_READ,导致空转.

分析: 当TCP一方断开时,触发OP_READ,socketChannel.read(readBuffer)返回-1,表示对方连接已断开,自己也需要断开连接socketChannel.close(),否则会一直触发OP_READ,导致空转

	while (true) {
        int ready = selector.select();
        if (ready > 0) {
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                iterator.remove();
                if (selectionKey.isConnectable()) {
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    if (socketChannel.isConnectionPending()) {
                        socketChannel.finishConnect();
                    }
                    socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                } else if (selectionKey.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    socketChannel.configureBlocking(false);
                  	//The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
                    int read = socketChannel.read(readBuffer);
                    readBuffer.flip();
                  	//读到-1 没有处理 导致空转
                    if (read > 0) {
                        System.out.print(new String(readBuffer.array(), 0, read));
                    }
                } 
              ...
            }
        }
    }

最佳实践:

	if (selectionKey.isReadable()) {
        ByteBuffer readBuffer = Server.SocketContext.get(socketChannel).getReadBuffer();
        int        read       = socketChannel.read(readBuffer);
        readBuffer.flip();
        if (read > 0) {
            System.out.print(new String(readBuffer.array(), 0, read));
        } else if (read == -1) {//对面已断开 close
            System.out.println("断开..."
                    + socketChannel.socket().getRemoteSocketAddress());
            socketChannel.close();
        }
    }

ByteBuf使用

ByteBuf,ByteBuffer对比

特性
ByteBuffer 1.有position,limit属性,通过flip()切换读写模式 ,不支持同时读/写 2.定长 3.直接内存
ByteBuf 1.有rix,wix,cap,maxCap属性,支持同时读/写 2.自动扩容 3.直接内存,堆内存,组合

建议使用ByteBuf

ByteBuf 的clear()和discardReadBytes()对比

现象: 使用clear()导致丢数据

原因: clear()实现通过 rix=wix=0,假如此时同时有数据写入,该部分数据则丢失

	if (selectionKey.isWritable()) {
	    while (writeBuffer.isReadable()) {
	        ByteBuffer byteBuffer = writeBuffer.nioBuffer();
	        channel.write(byteBuffer);
	        writeBuffer.readerIndex(writeBuffer.readerIndex() + byteBuffer.position());
	        int left = byteBuffer.limit() - byteBuffer.position();
	        if (left != 0) {//无法一次性写入到缓冲区中,可能发生空转 break
	    		...
	            break;
	        } else {
               //清理已发送数据
	           writeBuffer.clear();
	        }
	    }
	  ...
	}

最佳实践:

使用discardReadBytes(),其通过arrayCopy方式并且线程安全,能够防止数据丢失.但频繁的arrayCopy会有性能问题. 可以使用clear()和discardReadBytes()的组合

	if (selectionKey.isWritable()) {
	    while (writeBuffer.isReadable()) {
          	//当缓冲区使用>2/3事 且wix-rix< (maxCap*1/3) 对缓冲区进行整理
          	if (writeBuffer.writerIndex() > (writeBuffer.maxCapacity() / 3 * 2) && writeBuffer.writerIndex() - writeBuffer
                            .readerIndex() < (writeBuffer.maxCapacity() / 3)) {
                        System.out.println(String.format("缓冲区使用超过2/3 discardReadBytes writerIndex:%d " +
                                "readerIndex:%d", writeBuffer
                                .writerIndex(), writeBuffer.readerIndex()));
                        writeBuffer.discardReadBytes();
                    }
          	
	        ByteBuffer byteBuffer = writeBuffer.nioBuffer();
	        channel.write(byteBuffer);
	        writeBuffer.readerIndex(writeBuffer.readerIndex() + byteBuffer.position());
	        int left = byteBuffer.limit() - byteBuffer.position();
	        if (left != 0) {//无法一次性写入到缓冲区中,可能发生空转 break
	           ...
	            //防止空转 等待下次write事件
	            break;
	        } else {
	            //注意clear()的使用 因为writeBuffer一直在写入 writerIndex可能>readIndex
	            if (writeBuffer.writerIndex() == writeBuffer.readerIndex()) {
	                //TODO 因为不是原子过程 理论上会有问题 但实际验证中却没问题 待验证
	                writeBuffer.clear();
	                System.out.println("clear");
	            } 
	        }
	    }
	    ...
	}

NIO epoll空轮训bug

epoll空轮训bug