阅读 841

NIO 在Jetty中的应用

引子

作为纵横情场多年的老手,宪程在把到妹子后通常有以下策略 (假设宪程是影流之主的第1024代传人并且只剩下了分身的能力)

  • 将妹子存到队列中,不时发微信去撩一下,如果有意向的话宪程会使用分身能力再创建一个宪程去把妹

  • 宪程自己执行把妹的操作,如果期间又有新的妹子看上他咋办呢,那就将该妹子交给自己的分身宪程去轮询处理,并且宪程在把完妹子之后会尝试去把分身宪程的轮询任务给接过来,毕竟本体总是要掌握主动权的,如果没有接过来咋办?只能选择成为分身了,毕竟此时分身宪程已经接过了本体的工作,某种意义上他已经成为了本体

Jetty NIO 模型

建议在阅读之前先了解以下Tomcat的NIO模型,没有对比就没有伤害,你会发现Jetty NIO模型的有趣之处

概述

如果时间充足的话,我建议你直接阅读附录,了解如何Debug Jetty NIO功能

既然要了解Jetty的NIO模型,从线程的角度来说可以分为以下几类

  • 空闲线程 此角色会根据提交到线程池中的任务,将自己转变为I/O线程或者轮询线程

  • Acceptor线程 该角色主要负责接收来自客户端的连接并对其进行封装之后,选择一个Selector来提交此任务

  • 轮询线程 此角色主要负责轮询事件,并处理其他角色提交给此角色的任务,另外此角色可以根据所设定的策略将轮询任务交给其他线程,在执行完I/O任务之后归还到线程池中成为空闲线程

主要参与的类有

  • Connector 该角色主要负责JettyNIO模型中各个组件的启动和,协调工作

  • SelectorManager 此角色主要对ManagedSelector进行管理,想要和Selector进行交互可以使用此类

  • ManagedSelector 封装了JDK原生的selector, 并对外提供对selector执行操作的内部类、接口以及方法

重点 所有线程共用一个线程池

Connector

关键类 org.eclipse.jetty.server.ServerConnector

Connector即连接器,是Jetty对于网络I/O模型的一个抽象,主要负责组装,启动Jetty NIO模型中所需要用到的组件。因此,我们主要注意力集中到其实现上也就是ServerConnector上。

初始化Connector连接器,我们需要向其提供以下关键参数(隐去了和本文无关的参数,有兴趣的可自行了解)

  • 用来执行接收新连接、处理I/O、轮询事件任务的线程池
  • ByteBuffer 对象池, 该对象池可以回收以及提供ByteBuffer给I/O线程使用
  • 负责执行accept操作线程的数量
  • 负责执行轮询任务的selector线程数量

但是,大部分的初始化工作并不是在ServerConnector中执行的,而是在其父类中执行的操作,因此我们将目光转移到 org.eclipse.jetty.server.AbstractConnector

该类的初始化代码如下,其主要做了以下工作

  • 检查是否指定线程池,如果没有则和Server共用一个线程池
  • 检查是否指定ByteBufferPool,如果没有则使用ArrayByteBuffer
  • 检查是否设置Acceptor数量,如果没有则按照max(1,min(4,CPU核心数÷8))进行计算,也就是说默认的Acceptor数量最少有一个,最多有4个

想象一下,如果ServerSocketChannel被设置为阻塞状态以便多个线程同时执行accept操作,那么大多数情况下多数线程将会陷入阻塞状态,并且线程从阻塞态恢复是有线程上下文切换的成本的因此Acceptor线程并不是越多越好

    public AbstractConnector(
        Server server,
        Executor executor,
        Scheduler scheduler,
        ByteBufferPool pool,
        int acceptors,
        ConnectionFactory... factories)
    {
        _server = server;
        //检查是否设置线程池,如果没有则使用Server的
        _executor = executor != null ? executor : _server.getThreadPool();
        if (scheduler == null)
            scheduler = _server.getBean(Scheduler.class);
        _scheduler = scheduler != null ? scheduler : new ScheduledExecutorScheduler(String.format("Connector-Scheduler-%x", hashCode()), false);
        
        // 检查是否指定ByteBufferPool,如果没有则自己创建一个
        if (pool == null)
            pool = _server.getBean(ByteBufferPool.class);
        _byteBufferPool = pool != null ? pool : new ArrayByteBufferPool();
        // 将这些对象交给Jetty统一管理(不在本文讨论范围内,不展开)
        addBean(_server, false);
        addBean(_executor);
        if (executor == null)
            unmanage(_executor); // inherited from server
        addBean(_scheduler);
        addBean(_byteBufferPool);
        // ConnectionFactory主要使用来处理对应的HTTP协议
        for (ConnectionFactory factory : factories)
        {
            addConnectionFactory(factory);
        }
        // 如果未指定Acceptor的数量则根据CPU核数执行计算
        int cores = ProcessorUtils.availableProcessors();
        if (acceptors < 0)
           //根据此式可以推出Acceptor数量最大是4最小是1
            acceptors = Math.max(1, Math.min(4, cores / 8));
        // Acceptor数量大于CPU核心数
        // 将会引起大量的线程陷入阻塞状态
        // 没有东西可以accept不就阻塞了吗
        // 而要激活阻塞的线程则需要切换线程上下文会引起性能的浪费
        if (acceptors > cores)
            LOG.warn("Acceptors should be <= availableProcessors: " + this);
        _acceptors = new Thread[acceptors];
    }

复制代码

如下图所示我的电脑为4核心的i5CPU,那么默认的Acceptor线程应该只有一个

4核心CPU
在启动你的Jetty之后我们可以用JConsole来验证一下
正如你所看到的,以qtp开头的线程用于NIO的线程池,其中一个Acceptor线程阻塞在accept()方法上

Acceptor

Acceptor是一个定义在AbstractConnector中的内部类, 其主要工作不断调用在子类中实现accept方法,也就是接收连接的实现延迟到了子类中。

其代如下,可以学到不少小技巧, 如果你不想看代码,其总结如下

  • 获取执行当前代码线程,给他起个名字,见上一节JConsole的截图
  • 将Acceptor线程优先级调至最高(当然,不一定起作用,还得看人操作系统理不理你)
  • 在执行accept操作之前需要等待来自其他线程的放行信号
  • 不断循环执行accept操作
      public void run()
        {
           // 给线程起给名字
            final Thread thread = Thread.currentThread();
            String name = thread.getName();
            _name = String.format("%s-acceptor-%d@%x-%s", name, _id, hashCode(), AbstractConnector.this.toString());
            thread.setName(_name);
            // 设置优先级
            int priority = thread.getPriority();
            if (_acceptorPriorityDelta != 0)
                thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _acceptorPriorityDelta)));
            // 保存对此线程的引用
            _acceptors[_id] = thread;
            
            try
            {
                while (isRunning())
                {
                    // 加锁,等待来自其他线程的信号说可以开始干活了
                    try (Locker.Lock lock = _locker.lock())
                    {
                        if (!_accepting && isRunning())
                        {
                            _setAccepting.await();
                            continue;
                        }
                    }
                    catch (InterruptedException e)
                    {
                        continue;
                    }

                    try
                    {
                       //调用子类的accept方法
                        accept(_id);
                    }
                    catch (Throwable x)
                    {
                        if (!handleAcceptFailure(x))
                            break;
                    }
                }
            }
            finally
            {
               // 发生异常了,则将线程的名称以及优先级调回原来的值
                thread.setName(name);
                if (_acceptorPriorityDelta != 0)
                    thread.setPriority(priority);
                
                //释放引用
                synchronized (AbstractConnector.this)
                {
                    _acceptors[_id] = null;
                }
                CountDownLatch stopping = _stopping;
                if (stopping != null)
                    stopping.countDown();
            }
        }
复制代码

在子类ServerConnector中,accept主要执行以下操作

  • 阻塞的形式接收来自客户端的连接
  • 设置客户端SocketChannel非阻塞模式,并禁用nagle算法
  • 交给SelectorManager来处理, 该类会将客户端SocketChannel封装成一个Accept事件,交给轮询线程处理 ServerConnector中的代码
    @Override
    public void accept(int acceptorID) throws IOException
    {
        ServerSocketChannel serverChannel = _acceptChannel;
        if (serverChannel != null && serverChannel.isOpen())
        {
            SocketChannel channel = serverChannel.accept();
            accepted(channel);
        }
    }

    private void accepted(SocketChannel channel) throws IOException
    {
        channel.configureBlocking(false);
        Socket socket = channel.socket();
        configure(socket); // socket.setTcpNoDelay(true);
        _manager.accept(channel);
    }
复制代码

SelectorManager中最终被调用的代码

    public void accept(SelectableChannel channel, Object attachment)
    {
        final ManagedSelector selector = chooseSelector();
        selector.submit(selector.new Accept(channel, attachment));
    }
复制代码

轮询线程

轮询线程主要负责轮询I/O事件以及处理其他线程提交到本线程任务。并且我们可以为轮询线程指定执行策略, 在后面我们可以看到执行策略将如何影响轮询线程行为。

首先,我们需要先明确哪些类会参与到轮询线程的工作中,也就是说我们要先理清楚轮询线程的调用链。

如上图堆栈跟踪图红框所标注的部分所示,参与到轮询线程主要堆栈结构如下图所示。

  • ManagedSelector 此类主要封装了JDK的selector类,并对外暴露操作此Selector的方法和类
  • EatWhatYouKill 此类即轮询线程执行策略,该类会不断调用SelectorProducer.produce 方法产生封装好的I/O任务,并根据其策略来决定执行这个I/O任务的方式
  • SelectorProducer 此类为ManagedSelector的内部类,实现线程执行策略里面的ExecutionStrategy.Producer接口,该类专门用于生成供轮询线程处理的I/O任务

ManagedSelector

Jetty将JDK原生的Selector类封装成为ManagedSelector,该类主要功能是对外暴露对其封装的selector执行操作的接口和内部类. 其关键方法和内部类如下

SelectorUpdate接口 如果要对ManagedSelector所管理的selector进行更新(如执行注册感兴趣的I/O事件)可以实现此接口,该接口定义如下

    public interface SelectorUpdate
    {
        void update(Selector selector);
    }
复制代码

submit方法 该方法主要用于外界将SelectorUpdate提交到轮询线程中以便执行对Selector的更新操作,简单来说此方法会执行以下操作

  • 将update事件加入队列
  • 检查Selector是否正在执行select操作,如果是则将其唤醒,使其从阻塞状态返回以便我们对其进行更新
    public void submit(SelectorUpdate update)
    {
        if (LOG.isDebugEnabled())
            LOG.debug("Queued change {} on {}", update, this);

        Selector selector = null;
        synchronized (ManagedSelector.this)
        {
            //加事件加入处理队列
            _updates.offer(update);
            //检查是否正在轮询,如果正在轮询,则会执行唤醒操作
            //因此在此处需要将selecting置为false
            if (_selecting)
            {
                selector = _selector;
                // To avoid the extra select wakeup.
                _selecting = false;
            }
        }

        if (selector != null)
        {
           //执行唤醒操作,以便对selector执行更新操作
            if (LOG.isDebugEnabled())
                LOG.debug("Wakeup on submit {}", this);
            selector.wakeup();
        }
    }
复制代码

SelectorProducer

SelectorProducerManagedSelector的内部类,该类实现了轮询线程执行策略的ExecutionStrategy.Producer接口

    interface Producer
    {
        // 返回一个Runnable任务供轮询线程执行
        Runnable produce();
    }
复制代码

因此SelectorProducer需要不断调用selector去轮询看有无新的I/O事件以供处理,除此之外它还需要处理外部类向ManagedSelector通过调用submit方法提交的SelectorUpdate任务

其向线程执行策略类所提供produce方法代如下所示,总的来说主要完成以下几项工作

  • 执行一个循环,直到轮询到感兴趣的任务(一次只返回一个,被轮询到事件会被保存起来供下一次使用)
  • 处理外部类向其提交的任务(调用processUpdates)
  • 更新客户端SocketChannel感兴趣的事件
        @Override
        public Runnable produce()
        {
            while (true)
            {
                //处理之前查询到事件
                Runnable task = processSelected();
                if (task != null)
                    return task;
                //处理外部类所提交的update任务
                //该方法最终会导致提交的SelectorUpdate.update被调用
                processUpdates();
                //此方法的调用可能会
                //导致客户端SocketChannel感兴趣的事件发生变更
                updateKeys();
                //执行select操作,并将查询到事件保存起来
                if (!select())
                    return null;
            }
        }
复制代码

processUpdates 此方法主要是处理外部类提交的SelectorUpdate任务,通过复制引用非常巧妙的避免了并发问题

        private void processUpdates()
        {
            synchronized (ManagedSelector.this)
            {
                //倒腾数据,将要处理队列的引用保存
                //到另一个变量上,原有的引用可以继续对外提供服务
                //整个数据倒腾过程非常短,性能影响较小
                Deque<SelectorUpdate> updates = _updates;
                _updates = _updateable;
                _updateable = updates;
            }

            if (LOG.isDebugEnabled())
                LOG.debug("updateable {}", _updateable.size());
            //遍历事件队列,处理update方法
            for (SelectorUpdate update : _updateable)
            {
                if (_selector == null)
                    break;
                try
                {
                    if (LOG.isDebugEnabled())
                        LOG.debug("update {}", update);
                    //调用事件的update方法,并传入selector
                    update.update(_selector);
                }
                catch (Throwable ex)
                {
                    LOG.warn(ex);
                }
            }
            _updateable.clear();

            Selector selector;
            int updates;
            //再次检查是否有新的事件被提交,如果有则执行唤醒操作
            synchronized (ManagedSelector.this)
            {
               //外部类提交的任务会保存到updates中
                updates = _updates.size();
                _selecting = updates == 0;
                selector = _selecting ? null : _selector;
            }

            if (LOG.isDebugEnabled())
                LOG.debug("updates {}", updates);

            if (selector != null)
            {
                if (LOG.isDebugEnabled())
                    LOG.debug("wakeup on updates {}", this);
                selector.wakeup();
            }
        }

复制代码

select() 该方法主要执行轮询操作,并将轮询到事件保存起来以供下一次循环的时候返回,在这个方法中展现jetty如何处理空轮询事件(空轮询是指selector在执行select操作时,没有查询到任何事件却返回了,这个BUG通常会造成CPU100%的使用率,从而使系统崩溃)

        private boolean select()
        {
            try
            {
                Selector selector = _selector;
                if (selector != null && selector.isOpen())
                {
                    if (LOG.isDebugEnabled())
                        LOG.debug("Selector {} waiting with {} keys", selector, selector.keys().size());
                    int selected = selector.select();
                    //没查询到事件, 空轮询事件处理
                    if (selected == 0)
                    {
                        if (LOG.isDebugEnabled())
                            LOG.debug("Selector {} woken with none selected", selector);
                        //如果线程被中断,并且标志位被设置了不在运行则执行推出逻辑
                        if (Thread.interrupted() && !isRunning())
                            throw new ClosedSelectorException();
                        //开启了此参数则立即执行一次select操作
                        if (FORCE_SELECT_NOW)
                            selected = selector.selectNow();
                    }
                    if (LOG.isDebugEnabled())
                        LOG.debug("Selector {} woken up from select, {}/{}/{} selected", selector, selected, selector.selectedKeys().size(), selector.keys().size());

                    int updates;
                    synchronized (ManagedSelector.this)
                    {
                        // 完成了select操作则设置标志位
                        _selecting = false;
                        updates = _updates.size();
                    }

                    _keys = selector.selectedKeys();
                    _cursor = _keys.isEmpty() ? Collections.emptyIterator() : _keys.iterator();
                    if (LOG.isDebugEnabled())
                        LOG.debug("Selector {} processing {} keys, {} updates", selector, _keys.size(), updates);

                    return true;
                }
            }
            catch (Throwable x)
            {
                _selector = null;
                if (isRunning())
                    LOG.warn(x);
                else
                {
                    LOG.warn(x.toString());
                    LOG.debug(x);
                }
                closeNoExceptions(_selector);
            }
            return false;
        }
复制代码

与Netty的空轮询处理策略不同,Jetty的处理策略是再select一次并立即返回,但这样似乎并不能解决空轮询的BUG问题详情

EatWhatYouKill

EatWhatYouKill是线程执行策略的一种,也是Jetty默认的指策略,其思想来源于如果猎人杀死一只猎物,那么猎人就应该吃掉它(如果你吃过新鲜的虾你就会对这种哲学深有体会),换种说法就是轮询线程如果查询到一次I/O事件就应该直接处理它(想起引子了吗)

P.S. 关键代码org.eclipse.jetty.util.thread.strategy.EatWhatYouKill

之所以这样做的原因是因为切换线程是一件比较费时操作(相对来说),因此在这种策略下轮询线程A如果获取到一个事件会有以下策略

  • 如果此任务被标志为非阻塞任务,那么线程A会立即执行此任务

如果任务阻塞类型未知或者被标记为阻塞状态

  • 如果线程池中的线程都处于繁忙状态,则将其提交到线程池种等待执行

  • 如果线程池种有空闲线程B,则尝试将线程A负责轮询功能交给线程B,如果立即获取到线程B成功,则线程A会直接执行获取到的任务, 任务执行完成后,线程A会尝试夺回交给线程B的轮询任务,如果夺回失败则变为空闲线程等待分配任务。(想起引子了吗?)

  • 除此之外,线程A还会尝试直接执行任务并且不会交出轮询工作 (代码太长,只摘出关键代码)

    case BLOCKING:
        synchronized (this)
        {
            if (_pending)
            {
                //轮询工作陷入了停滞,因此是IDLE状态
                _state = State.IDLE;
                mode = Mode.EXECUTE_PRODUCE_CONSUME;
            }
            //tryExecute 如果立即分配到了线程则返回true
            //this的run方法也就是实现轮询线程核心的方法
            //因此此行代码相当于将轮询的工作转移给了其他线程
            else if (_tryExecutor.tryExecute(this))
            {
                _pending = true;
                //由于轮询工作的转移
                //因此当前轮询工作相当于陷入空闲状态
                //所以需要将此对象的状态至为IDLE
                //(轮询线程和当前线程使用同一个对象)
                _state = State.IDLE;
                mode = Mode.EXECUTE_PRODUCE_CONSUME;
            }else
            {
               //前两者均不满足则将任务提交到线程池
                mode = Mode.PRODUCE_EXECUTE_CONSUME;
            }
        }
        break;
复制代码

任务的执行策略

            case EXECUTE_PRODUCE_CONSUME:
                _epcMode.increment();
                //直接在当前线程调用
                runTask(task);

                // 尝试夺回轮询任务
                synchronized (this)
                {
                   // 如果State还处于空闲状态
                   // 说明
                   // 线程B还未开始执行轮询任务,可以直接夺回
                   // 如果线程B已经开始轮询
                   // 则选择离开
                    if (_state == State.IDLE)
                    {
                        // 返回true则继续轮询
                        return true;
                    }
                }
                //返回false则结束轮询任务,变为空闲线程
                return false;
复制代码

总结

相较于循规蹈矩的Tomcat,Jetty的设计更为激进,更富有冒险主义者的精神,从个人角度来说更喜欢Jetty的设计,但从业务的角度来说还是选择Tomcat较为稳妥毕竟稳定是业务的基本需求,并且Tomcat的性能也不会太差。

以线程的类别来进行划分的话, Jetty的NIO模型如下图所示

  • Acceptor 线程负责接收来自客户端的新连接,并将其封装成一个事件提交给轮询线程处理
  • 轮询线程 轮询线程处理负责轮询I/O事件之外,还需要处理外部线程所提交的selector更新任务,并且根据设定的执行策略,轮询线程可能会在本线程直接执行I/O任务,并将轮询任务移交给其他空闲的线程,或者选择一个空闲的线程来执行I/O操作
  • I/O线程 主要负责处理I/O操作

从线程类别的角度来看Jetty的NIO模型相对简单,但其引入的轮询线程执行策略使得线程之间身份可以发生转变, 得益于此Jetty可以直接轮询线程直接执行I/O任务减少了线程上下文切换所带来的性能消耗,提升了性能。

思想迁移

切换线程是有成本的 Jetty通过直接在轮询线程执行I/O任务来提升性能,来减少线程上下文的切换,除此之外,我们还可以实现协程的机制来减少线程上下文切换所带来的成本(参考Go语言)

Acceptor线程应适量 如果将ServerSocket设置为阻塞模式,那么accept操作将导致线程陷入阻塞,从accept方法返回时将引起线程上下的切换,因此并不是越多越好

如何Debug Jetty

我们使用SpringBoot来Debug Jetty,因此我们需要在pom.xml中引入Jetty,由于SpringBoot默认使用Tomcat因此我们需要将其替换掉,依赖如下所示.

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jetty</artifactId>
        </dependency>
复制代码

使用的SpringBoot版本是2.2.0其所依赖的Jetty版本号是9.4.20

  • 如果你要了解Connector是如何工作的请关注以下类 org.eclipse.jetty.server.ServerConnector

  • 如果你想要了解Jetty NIO 如何轮询以及处理事件,那么请关注以下类 org.eclipse.jetty.io.ManagedSelector 并在其内部类 SelectorProducerproduce方法打上断点,如下图所示,你将了解到整个轮询过程中都发生了什么

右键小红点,选择Thread,以避免进入不了断点的情况,毕竟我们调试的是多线程程序

  • 如果你想要了解线程执行的策略,那么请关注以下类(此类执行机制较为复杂,如果想Debug到所有的情况,最好结合一定的策略,如在Controller代码处阻塞住线程等) org.eclipse.jetty.util.thread.strategy.EatWhatYouKill
关注下面的标签,发现更多相似文章
评论