java多线程系列之synchronousQueue

8,021 阅读7分钟

synchronousQueue详解

在使用cachedThreadPool的时候,没有对原理去很好的理解,所以导致使用起来有些不放心,主要是对synchronousQueue的原理不太了解,所以有此文的分析

本文从两个方面分析synchronousQueue:

  1. synchronousQueue的使用,主要是通过Executors框架提供的线程池cachedThreadPool来讲,因为synchronousQueue是它的workQueue
  2. synchronousQueue的原理,主要是从实现角度,分析一下数据结构

synchronousQueue基本使用

首先说说api吧,关于队列有几套api,核心是下面的两套:

take() & put() //这是阻塞的,会阻塞操作线程
poll() & offer() //这是非阻塞的(在不设置超时时间的前提下),当操作不能达成的时候会立马返回boolean

synchronousQueue是一个没有数据缓冲的阻塞队列,生产者线程对其的插入操作put()必须等待消费者的移除操作take(),反过来也一样。

但是poll()和offer()就不会阻塞,举例来说就是offer的时候如果有消费者在等待那么就会立马满足返回true,如果没有就会返回false,不会等待消费者到来。

下面我们分析一下cachedThreadPool的使用流程,通过这个过程我们来了解synchronousQueue的使用方式:先看代码

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//1
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
  1. 对于使用synchronousQueue的线程池,在第一次execute任务的时候会在1处返回false,因为线程池中还没有线程,所以没有消费者在等待,所以就会直接创建线程进行执行任务
  2. 在上篇线程池的分析中我们提到:创建的线程在执行完毕任务后会去循环的getTask,在getTask的过程中会调用take去获取任务。所以当我们再次调用execute提交任务的时候1就会返回成功(前提是先前创建的线程已经执行完毕,正在执行gettask方法进行等待),因为这个时候已经有一个线程在等待task了,所以offer直接返回成功!
  3. 这就达到了cachedThreadPool线程复用的目的,也就是说:在提交任务的时候,如果所有工作线程都处于忙碌的状态就会新建线程来执行,如果有工作线程处于空闲状态则把任务交给空闲线程来执行!而这其中的黑科技就是通过synchronousQueue来进行的。

synchronousQueue内部数据结构

根据上面我们介绍的synchronousQueue的队列语义,我们其实可以很容易的通过锁或者信号量等一系列的同步机制来实现一个synchronousQueue的结构,但是我们知道有锁的一般效率都不会太高,所以java为我们提供了下面一种无锁的算法。

无锁在java里面一般就是cas和spin来实现的!具体会在之后介绍java并发包的时候来分析TODO。下面的分析的核心在于:cas和spin,一般把cas和spin可以组合起来使用,spin就是不断循环重试cas操作,确保操作能够成功。这些就不详细介绍,网上有很多相关文章!这也是java并发的基础!

稍微跟踪一下代码,就会发现synchronousQueue内部是通过Transferer来实现的,具体分为两个Transferer,分别是TransferStack和TransferQueue,两者差别在于是否公平:下面我们只分析TransferQueue的实现。

        /** Node class for TransferQueue. */
        static final class QNode { //1
            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;

            QNode(Object item, boolean isData) {
                this.item = item;
                this.isData = isData;
            }

            boolean casNext(QNode cmp, QNode val) {//2
                return next == cmp &&
                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }

            boolean casItem(Object cmp, Object val) {//2
                return item == cmp &&
                    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
            }
         }
  1. 既然是队列,肯定有个node是代表队列的节点的。
  2. 2处代表了典型的两个cas赋值操作,代表了如何设置next和item的值,用于进行并发更新

之后是transfer操作

   E transfer(E e, boolean timed, long nanos) {//1
            QNode s = null; 
            boolean isData = (e != null);
            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)       
                    continue;                       

                if (h == t || t.isData == isData) { //2
                    QNode tn = t.next;
                    if (t != tail)                 
                        continue;
                    if (tn != null) {               
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);//3
                    if (!t.casNext(null, s))       //4
                        continue;

                    advanceTail(t, s);              // 5
                    Object x = awaitFulfill(s, e, timed, nanos);//6
                    if (x == s) {                  
                        clean(t, s);
                        return null;
                    }

                    if (!s.isOffList()) {          
                        advanceHead(t, s);        
                        if (x != null)            
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                           //7
                    QNode m = h.next;            //8
                    if (t != tail || m == null || h != head)
                        continue;                   

                    Object x = m.item;
                    if (isData == (x != null) ||   
                        x == m ||                  
                        !m.casItem(x, e)) {         // 9
                        advanceHead(h, m);          
                        continue;
                    }

                    advanceHead(h, m);              // 10
                    LockSupport.unpark(m.waiter);//11
                    return (x != null) ? (E)x : e;
                }
            }
        }
  Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
            /* Same idea as TransferStack.awaitFulfill */
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            int spins = ((head.next == s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {//6.1
                if (w.isInterrupted())
                    s.tryCancel(e);
                Object x = s.item;
                if (x != e)//6.2
                    return x;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel(e);
                        continue;
                    }
                }
                if (spins > 0)
                    --spins;
                else if (s.waiter == null)
                    s.waiter = w;
                else if (!timed)
                    LockSupport.park(this);//6.3
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }

在上面的代码中,我把重要的地方分了11步,分别进行解释:

首先说一下大致的操作。在transfer中,把操作分为两种,一种就是入队put,一种是出队take,入队的时候会创建data节点,值为data。出队的时候会创建一个request节点,值为null。

  1. put和take操作都会调用该方法,区别在于,put操作的时候e值为数据data,take操作的时候e值为null

  2. 如果h==t也就是队列为空,或者当前队列尾部的数据类型和调用该方法的数据类型一致:比如当前队列为空,第一次来了一个入队请求,这时候队列就会创建出一个data节点,如果第二次又来了一个入队请求(和第一次也就是队列尾部的数据类型一致,都是入队请求),这时候队列会创建出第二个data节点,并形成一个链表。同理,如果刚开始来了request请求,也会入队,之后如果继续来了一个reqeust请求,也会继续入队!

  3. 满足2的条件,就会进入3,中间会有一些一致性检查这也是必须的,避免产生并发冲突。3会创建出一个节点,根据e值的不同,可能是data节点或者request节点。

  4. 把3中创建的节点通过cas方式设置到队列尾部去。

  5. 把tail通过cas方式修改成3中新建立的s节点

  6. 调用方法awaitFulfill进行等待,如果3中创建的是data节点,那么就会等待来一个reqeust节点,反之亦然!

    1. 放入队列之后就开始进行循环判断
    2. 终止条件是节点的值被修改,具体如果是data节点,那么会被修改成null,如果是request节点,那么会被修改成data值。这个修改是在第9步中由相对的请求(如果创建的是data节点,那么就由reqeust请求来进行修改,反之亦然)来做的。如果一直没有相对的请求过来,那么节点的值就一直不会被修改,这样就跳不出循环体!
    3. 如果没有被修改,那么就需要进入park休眠,等待第9步进行修改后再通过unpark进行唤醒,唤醒之后就会判断节点值被修改从而返回。
  7. 如果在插入一个节点的时候,不满足2的条件,也就是队列不为空并且尾部节点和当前要插入节点的类型不一样(这就代表来了一个相对请求),比如上图中的尾部是data节点,如果来了一个插入reqeust节点的请求,那么就会走到7这里
  8. 由于是队列,先进先出,所以会取队列里面的第一个节点,也就是h.nex
  9. 把8中取出的节点的值通过cas的方式设置成新来节点的e值,这样就成功的满足了6-2的终止条件
  10. 将head节点往后移动,这样就把第一个节点成功的出队。
  11. 每个节点都保存了对应的操作线程,将8中节点对应的线程进行唤醒,这样6-3处于休眠的线程就醒来了,然后继续进行for循环,进而判断6-2终止条件满足,于是返回

整个过程就是这样,上文的分析只是分析了正常的工作流程,没有具体的分析操作中的竞态条件,比如两个线程同时进行入队的时候如何正确设置链表的状态,都讲的话篇幅过大。