阅读 116

Phaser源码分析

一、简介

          Phaser是java7中引入的,是并发包中提供的线程同步辅助工具类,是个阶段屏障, 所有parties线程需等待此阶段屏障的所有线程都到达,才能进入下一阶段屏障持续执行。 CyclicBarrier、CountDownLatch、Phaser 三个都是线程同步辅助工具类,同步辅助三剑客。CountDownLatch不能重用,CyclicBarrier、Phaser都可以重用,并且Phaser 更加灵活可以在运行期间随时加入(register)新的parties,也可以在运行期间随时退出(deregister),支持层级,根root Phaser,父Phaser把每个子的 Phaser(每个子的内部也有多个parties线程,也有多阶段) 当做父Phaser的一个parties,父Phaser等待所有的parties都到达父的阶段屏障, 即子Phaser的所有阶段都执行完,即子Phaser都到达父的阶段屏障。唤醒所有的子Phaser的parties线程继续执行下一阶段。 CyclicBarrier也可以做为阶段屏障使用,每个线程重复做为CyclicBarrier的parties,但是没办法想Phaser那样支持层级。 例如比赛,一个比赛分为3个阶段(phase): 初赛、复赛和决赛,规定所有运动员都完成上一个阶段的比赛才可以进行下一阶段的比赛,并且比赛的过程中允许退赛(deregister),这个场景就很适合Phaser,这个例子的话CyclicBarrier也可以实现,如果是更复杂如奥运闭幕(父Phaser),需等待各种比赛结束,如跳远(子Phaser,也有多个阶段,初赛、复赛和决赛),田径(子Phaser,也有多个阶段,初赛、复赛和决赛) 等,Phaser就更加适合,你也可以把重造轮子写个CyclicBarrier子类,改造成类似Phaser支持层级的功能。CyclicBarrier可以看我的另一篇源码分析 juejin.im/post/5d3bf8…,CountDownLatch可以看我的另一篇juejin.im/post/5d3593…。  

二、属性

/**
 * Phaser中的状态,64位的属性state不同位被用来存放不同的值,低16位存放unarrived,低32位中的高16位存放parties,高32位的低31位存放phase,最高位存放terminated,即Phaser是否关闭
 * 
 * unarrived  -- 还没到达阶段屏障的参与者数量,可以在运行期间增加,属性state的低16位存放 (bits  0-15)
 * parties    -- Phaser中的参与者,可以在运行期间增加,低32位中的高16位存放parties,每次进入下一阶段unarrived会被重新赋值为parties值   (bits 16-31)
 * phase      -- 记录Phaser的当前阶段,只有所有的参与者都到达阶段屏障,才能进入下一阶段屏蔽,高32位的低31位存放phase(bits 32-62)
 * terminated -- 属性state最高位存放terminated,即Phaser是否关闭             (bit  63 / sign)
 */
private volatile long state;
//Phaser中的最大参与者值
private static final int  MAX_PARTIES     = 0xffff;
//Phaser中的最大阶段值,属性state的高32位的低31位存放phase记录Phaser的当前阶段的最大值private static final int  MAX_PHASE       = Integer.MAX_VALUE;
//属性state的低32位中的高16位存放parties,即Phaser中的参与者,在下面获取parties时,需要用到的移位操作值
private static final int  PARTIES_SHIFT   = 16;
//属性state的高32位的低31位存放记录Phaser的当前阶段值,在下面获取phase时,需要用到的移位操作值
private static final int  PHASE_SHIFT     = 32;
//属性state的低32位中的低16位存放未抵达当前阶段屏障的参与者个数,与UNARRIVED_MASK值&得未抵达当前阶段屏障的参与者个数
private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
//属性state的低32位中的高16位存放parties,即Phaser中的参与者,与PARTIES_MASK值&得屏障的参与者个数
private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
//属性state的低32位中的低16位存放未抵达当前阶段屏障的参与者个数,低32位中的高16位存放parties,即Phaser中的参与者,与COUNTS_MASK值&得unarrived和parties两部分值     
private static final long COUNTS_MASK     = 0xffffffffL;
//属性state最高位存放terminated,即Phaser是否关闭,如果想关闭屏蔽state属性值或(|)上TERMINATION_BIT值
private static final long TERMINATION_BIT = 1L << 63;

//阶段屏障的一个参与者到达,即unarrived值减1
private static final int  ONE_ARRIVAL     = 1;
//一个参与者,属性state的低32位的高16位存放parties,只在ONE_DEREGISTER中使用到  
private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
//屏障的一个参与者取消注册,即一个参与者退出屏障,即属性state值的低32位的高16位parties值减1,低16位unarrived值减1
private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;

//如果在构造Phaser时,传入进来的参与者个数parties等于0,属性state的初始值
private static final int  EMPTY           = 1;

//Phaser支持层级,父Phaser把每个子的Phaser(每个子的内部也有多个parties线程,也有多阶段)当做父Phaser的一个parties,父Phaser等待所有的parties都到达父的阶段屏障, 即子Phaser的所有阶段都执行完,即子Phaser都到达父的阶段屏障。唤醒所有的子Phaser的parties线程继续执行下一阶段
private final Phaser parent;

//不管是有层级还是无层级的Phaser的根Phaser root都是同一个,没有层级是自身,有层级就是父的root
private final Phaser root;
//偶链表,Phaser屏障是有多个阶段,为了防止竞争,偶数的阶段采用偶链表,一个参与者在到达阶段屏障时,还有其他参与者还未到达,自旋一段时间,其余参与者还未到达,将其封装成节点加入链表中
private final AtomicReference<QNode> evenQ;
//奇链表
private final AtomicReference<QNode> oddQ;
//获得可用的处理器个数
private static final int NCPU = Runtime.getRuntime().availableProcessors();
//一个参与者在到达阶段屏障时,还有其他参与者还未到达,参与者线程不是直接进入等待状态而是先自旋一段时间,自旋值根据处理器个数
static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
//UnSafe的使用,如果不清楚的话,可以看https://juejin.im/user/5bd8718051882528382d8728/shares中的UnSafe介绍
private static final sun.misc.Unsafe UNSAFE;
//属性state的相对偏移量,相对Phaser实例的起始内存位置的相对偏移量,定义成静态的原因是,属性的相对实例的偏移量都是相等的
private static final long stateOffset;
static {
        try {
            //获取UnSafe实例
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            //Phaser的Class信息
            Class<?> k = Phaser.class;
            //使用UnSafe实例获取Phaser类的属性state的相对偏移量
            stateOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("state"));
        } catch (Exception e) {
            throw new Error(e);
        }
}复制代码

三、内部类

//一个参与者在到达阶段屏障时,还有其他参与者还未到达,自旋一段时间,其余参与者还未到达,将其封装成节点QNode加入链表中
static final class QNode implements ForkJoinPool.ManagedBlocker {
        //Phaser实例,需拿到state的阶段值phaser,由于state用volatile修饰,java内存模型加上happen-before,保证state的写对后续的读可见
        final Phaser phaser;
        //当前节点所处的屏障Phaser阶段
        final int phase;
        //当前等待节点线程是否可被中断
        final boolean interruptible;
        //当前等待节点线程是否超时等待其他未到达阶段屏障参与者线程到达阶段屏障 
        final boolean timed;
        //当前等待节点线程在等待的过程中是否被中断
        boolean wasInterrupted;
        //当前等待线程如果支持超时等待,等待的超时时间,单位纳秒
        long nanos;
        //当前等待节点等待的超时的时间点
        final long deadline;
        //当前等待节点的线程,可能为空,当前节点被取消等待
        volatile Thread thread; // nulled to cancel wait
        //由于QNode是个链表,当前等待节点的下一等待节点
        QNode next;
        
        //QNode构造函数,传入phaser实例,phase当前等待节点所处的阶段,interruptible当前节点线程是否可被中断,timed当前等待节点线程是否超时等待,nanos当前等待线程如果支持超时等待,等待的超时时间
        QNode(Phaser phaser, int phase, boolean interruptible,
              boolean timed, long nanos) {
            this.phaser = phaser;
            this.phase = phase;
            this.interruptible = interruptible;
            this.nanos = nanos;
            this.timed = timed;
            this.deadline = timed ? System.nanoTime() + nanos : 0L;
            thread = Thread.currentThread();
        }
        
        //当前等待节点是否被释放
        public boolean isReleasable() {
            //如果等待节点的线程为空,表明当前等待节点被释放
            if (thread == null)
                //返回true,表明当前等待节点被释放
                return true;
            //如果当前等待节点的屏障阶段和当前的phaser的阶段不一致,表明当前等待节点被释放 
            if (phaser.getPhase() != phase) {
                //将等待节点对应的线程置为空
                thread = null;
                //返回true,表明当前等待节点被释放               
                return true;
            }
            //如果节点对应的线程被中断
            if (Thread.interrupted())
                //wasInterrupted置为true,表明节点对应线程被中断
                wasInterrupted = true;
            //如果节点对应线程被中断,并且当前节点支持线程可中断 
            if (wasInterrupted && interruptible) {
                //将等待节点对应的线程置为空
                thread = null;
                //返回true,表明当前等待节点被释放
                return true;
            }
            //当前等待节点线程超时等待其他未到达阶段屏障参与者线程到达阶段屏障 
            if (timed) {
                //属性nanos超时时间大于0
                if (nanos > 0L) {
                    //节点的超时时间点减去当前时间,获得新的超时时间
                    nanos = deadline - System.nanoTime();
                }
                //如果属性nanos超时时间小于0,表明当前等待节点线程等待其余参与者线程到达阶段屏障超时
                if (nanos <= 0L) {
                    //将等待节点对应的线程置为空
                    thread = null;
                    //返回true,表明当前等待节点被释放
                    return true;
                }
            }
            //返回false,表明当前等待节点还未被释放
            return false;
        }
        
        //阻塞节点线程,线程被唤醒时,判断节点是否被释放
        public boolean block() {
            //调用上面的isReleasable判断节点是否被释放 
            if (isReleasable())
                //返回true,节点被释放
                return true;
            //如果节点线程不支持超时等待其余参与者线程到达阶段屏障,LockSupport.park阻塞节点线程,直到调用unpark唤醒
            else if (!timed)
                //调用LockSupport阻塞节点线程
                LockSupport.park(this);
            //如果节点支持超时等待其余参与者线程到达阶段屏障,并且超时时间大于0
            else if (nanos > 0L)
                //调用LockSupport.parkNanos超时的等待其余参与者线程到达阶段屏障,直到超时,或者调用unpark唤醒
                LockSupport.parkNanos(this, nanos);
            //调用上面的isReleasable判断节点是否被释放  
            return isReleasable();
        }
}复制代码

四、构造函数

/**
 * 创建新的屏障Phaser实例,无参构造Phaser实例,没有注册parties参与者,并且没有父Phaser,初始屏障阶段数字0
 */
public Phaser() {
        //调用下面Phaser(Phaser parent, int parties)构造函数,父Phaser为null,parties参与者初始化为0
        this(null, 0);
}

/**
 * 创建新的屏障Phaser实例,注册parties参与者数parties,没有父Phaser,初始屏障阶段数字0
 *
 * @param parties 初始参与者数
 */
public Phaser(int parties) {
        //调用下面Phaser(Phaser parent, int parties)构造函数,父Phaser为null,parties参与者初始化为parties
        this(null, parties);
}

/**
 * 创建新的屏障Phaser实例,没有注册parties参与者,传入父Phaser,初始屏障阶段数字0
 * 
 * @param parent 父Phaser
 */
public Phaser(Phaser parent) {
        //调用下面Phaser(Phaser parent, int parties)构造函数,父Phaser为parent,parties参与者初始化为0 
        this(parent, 0);
}

/**
 * 创建新的屏障Phaser实例,注册parties参与者,传入父Phaser,初始屏障阶段数字0
 */
public Phaser(Phaser parent, int parties) {
        //如果传入parties是负数,或者值超过MAX_PARTIES,都会抛出IllegalArgumentException异常 
        if (parties >>> PARTIES_SHIFT != 0)
            //抛出IllegalArgumentException异常
            throw new IllegalArgumentException("Illegal number of parties");
        //初始屏障阶段为0
        int phase = 0;
        //属性值parent赋值为传入进来的父Phaser parent
        this.parent = parent;
        //如果传入的父Phaser不为空,表明当前Phaser实例支持层级
        if (parent != null) {
            //获取父的root phaser
            final Phaser root = parent.root;
            //将当前Phaser实例根root phaser赋值为父Phaser的root phaser
            this.root = root;
            //将当前Phaser实例偶链表赋值为根root phaser的偶链表 
            this.evenQ = root.evenQ;
            //将当前Phaser实例奇链表赋值为根root phaser的奇链表
            this.oddQ = root.oddQ;
            //注册parties参与者不等于0,表明有注册parties参与者 
            if (parties != 0)
                //父Phaser把每个子的 Phaser(每个子的内部也有多个parties线程,也有多阶段)当做父Phaser的一个parties,为此需要调用父的doRegister方法将当前子Phaser当做一个parties注册到父Phaser中,doRegister方法在下面进行介绍
                phase = parent.doRegister(1);
        }
        //如果传入的父Phaser为空,表明当前Phaser实例不支持层级
        else {
            //将当前Phaser实例根root phaser赋值为自身
            this.root = this;
            //初始化一个新的偶链表
            this.evenQ = new AtomicReference<QNode>();
            //初始化一个新的奇链表 
            this.oddQ = new AtomicReference<QNode>();
        }
        //初始化属性state,如果没有注册的parties参与者,将属性state值赋值为EMPTY
        this.state = (parties == 0) ? (long)EMPTY :
            //否则将phaser左移32位,或(|)上 ,将parties左移16位,或上parties值,即unarrived初始为parties
            ((long)phase << PHASE_SHIFT) | ((long)parties << PARTIES_SHIFT) | ((long)parties);
}复制代码

五、注册参与者

/**
 * 在Phaser运行期间注册一个参与者
 */
public int register() {
        //调用下面介绍的doRegister方法注册一个参与者到Phaser实例中 
        return doRegister(1);
}

//@param registrations 注册参与者数目
private int doRegister(int registrations) {
        // adjustment to state unarrived
        //由于注册参与者,需要调整parties和unarrived的值,为此registrations需左移16位得到parties值,或(|)上registrations(unarrived值)    
        long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
        //当前Phaser实例获取父Phaser实例
        final Phaser parent = this.parent;
        //记录当前屏障处于第几个阶段
        int phase;
        //循环,直到注册参与者成功 
        for (;;) {
            //如果当前Phaser实例没有父Phaser,获取当前Phaser实例的属性state值(terminated、phase、parties、unarrived),否则调用reconcileState()方法,reconcileState是调整当前Phaser实例的属性state中的phase值与root的一致
            long s = (parent == null) ? state : reconcileState();
            //获取state属性值的低32位,低32位的高16位存放parties值,低16位存放unarrived值  
            int counts = (int)s;
            //counts高16位存放parties值,低16位存放unarrived值,无符号右移16位获取到parties值
            int parties = counts >>> PARTIES_SHIFT;
            //counts低16位存放unarrived值,counts与(&)上UNARRIVED_MASK(0xffff),获取到低16位存放unarrived值 
            int unarrived = counts & UNARRIVED_MASK;
            //传入进来要注册参与者数目是否大于Phaser实例允许注册的参与者数目,即传入进来要注册的参与者数目加上已经注册的参与者数目大于Phaser实例允许的最大参与者数目MAX_PARTIES
            if (registrations > MAX_PARTIES - parties)
                //传入进来要注册参与者数目大于Phaser实例允许注册的参与者数目,抛出IllegalStateException异常,badRegister方法看下面介绍
                throw new IllegalStateException(badRegister(s));
            //属性state高32位的低31位存放phase(Phaser实例的阶段值),属性state值无符号右移32位获取到phase值
            phase = (int)(s >>> PHASE_SHIFT);
            //如果phase值小于0,表示属性state的最高位为1,属性state最高位存放terminated,即Phaser是否关闭,如果terminated为1,Phaser关闭
            if (phase < 0)
                //如果Phaser关闭,直接退出循环
                break;
            //如果counts(高16位存放parties值,低16位存放unarrived值)不等于EMPTY,即当前Phaser实例不是注册第一个参与者
            if (counts != EMPTY) {                  // not 1st registration
                //如果当前Phaser实例不为空,或者属性state值没有改变
                if (parent == null || reconcileState() == s) {
                    //如果当前阶段屏障Phaser实例的unarrived等于0,表明当前所有参与者都到达阶段屏障,只是还没使用UnSafe的compareAndSwapLong将unarrived更新成parties,即还没使用CAS将属性state值更新成新的state值,调用internalAwaitAdvance方法先自旋一段等待Phaser实例的进入到下一阶段,然后将当前注册的参与者注册到下一阶段中,否则的话进入等待状态直到Phaser实例进入到下一阶段,internalAwaitAdvance方法可以看下面的介绍
                    if (unarrived == 0)             // wait out advance
                        //使用根root Phaser实例调用下面介绍的internalAwaitAdvance方法,传入当前Phaser实例阶段和节点为null
                        root.internalAwaitAdvance(phase, null);
                    //如果当前Phaser实例的unarrived不等于0,使用UnSafe的cas将传入进来的参与者数目加到属性state中
                    else if (UNSAFE.compareAndSwapLong(this, stateOffset,
                                                       s, s + adjust))
                        //退出当前循环
                        break;
                }
            }
            //如果counts等于EMPTY,并且当前Phaser实例没有父Phaser,表明还没注册参与者,当前注册的参与者是第一批
            else if (parent == null) {              // 1st root registration
                //使用phase(阶段值)移位32位或(|)上adjust(高16位存放parties值,低16位存放unarrived值)的调整值
                long next = ((long)phase << PHASE_SHIFT) | adjust;
                //使用UnSafe的cas将属性state值更新成新的值next 
                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
                    //退出循环
                    break;
            }
            //如果counts等于EMPTY,并且当前Phaser实例有父Phaser,表明还没注册参与者,当前注册的参与者是第一批,当前Phaser实例做为父Phaser实例的一个parties
            else {
                //加锁,因为存在并发操作,当前Phaser实例做为锁对象
                synchronized (this) {               // 1st sub registration
                    //可能多个线程同时进入到else中,为此需要重新检查下属性state值,因为当前Phaser实例只能被当做一次父Phaser的一个parties参与者
                    if (state == s) {               // recheck under lock
                        //将当前Phaser实例做为父Phaser实例的一个parties参与者
                        phase = parent.doRegister(1);
                        //如果父Phaser的阶段值phase小于0,直接退出循环
                        if (phase < 0)
                            //退出循环
                            break;
                        // finish registration whenever parent registration
                        // succeeded, even when racing with termination,
                        // since these are part of the same "transaction".
                        //循环直到cas将注册进来的参与者注册到当前Phaser实例
                        while (!UNSAFE.compareAndSwapLong
                               (this, stateOffset, s,
                                ((long)phase << PHASE_SHIFT) | adjust)) {
                            //使用CAS更新当前Phaser实例的属性state为新的值失败,重新获取新的属性state值
                            s = state;
                            //重新从属性state值中获取到phase值,重新循环
                            phase = (int)(root.state >>> PHASE_SHIFT);
                            // assert (int)s == EMPTY;
                        }
                        //退出循环
                        break;
                    }
                }
            }
        }
        //返回当前Phaser实例的phase阶段值 
        return phase;
}

//调整当前Phaser实例的属性state
private long reconcileState() {
        //获取当前实例Phaser的根root Phaser实例 
        final Phaser root = this.root;
        //获取当前实例Phaser的属性state值 
        long s = state;
        //如果当前实例Phaser有层级,即root Phaser不是自身实例 
        if (root != this) {
            int phase, p;
            // CAS to root phase with current parties, tripping unarrived
            //如果当前Phaser实例的阶段值phase和根root phase的阶段值不一致,使用CAS将当前Phaser实例属性state替换成新的state值,即当前Phaser实例phase阶段值和root phaser的阶段值一致
            while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
                   (int)(s >>> PHASE_SHIFT) &&
                   !UNSAFE.compareAndSwapLong
                   (this, stateOffset, s,
                    s = (((long)phase << PHASE_SHIFT) |
                         ((phase < 0) ? (s & COUNTS_MASK) :
                          (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
                           ((s & PARTIES_MASK) | p))))))
                //重新获取当前Phaser实例的属性state值
                s = state;
        }
        //返回调整完的当前Phaser实例的属性state值
        return s;
}

//传入进来要注册参与者数目大于Phaser实例允许注册的参与者数目
private String badRegister(long s) {
        return "Attempt to register more than " +
            MAX_PARTIES + " parties for " + stateToString(s);
}

/**
 * 在Phaser实例运行期间批量注册参与者
 */
public int bulkRegister(int parties) {
        //如果批量注册参与者数目小于0,抛出IllegalArgumentException异常
        if (parties < 0)
            throw new IllegalArgumentException();
        //如果批量注册参与者数目等于0
        if (parties == 0)
            //返回根root Phaser实例的阶段值,即根root Phaser实例的phase值
            return getPhase();
        //调用上面介绍的doRegister方法注册批量参与者到Phaser实例中  
        return doRegister(parties);
}

/**
 * 获取Phaser实例的阶段值,如果不支持层级root就是自身Phaser实例
 */
public final int getPhase() {
        //root Phaser实例state属性高32位的低31位存放phase,为此需要右移32位得到phase值
        return (int)(root.state >>> PHASE_SHIFT);
}
复制代码

六、线程到达阶段屏障

/**
 * 一个参与者到达阶段屏障
 */
public int arrive() {
        //调用下面介绍的doArrive方法,将属性state中的unarrived做减1操作,如果是最后一个到达还需唤醒其他等待其他参与者都到达阶段屏障的参与者线程
        return doArrive(ONE_ARRIVAL);
}

//传入adjust调整值,可能是ONE_ARRIVAL或者ONE_DEREGISTER
private int doArrive(int adjust) {
        //获取根root Phaser实例 
        final Phaser root = this.root;
        //循环,直到属性state中的unarrived值做减1操作成功,如果adjust是ONE_DEREGISTER,直到属性state中的unarrived和parties都做减1操作成功  
        for (;;) {
            //如果根root Phaser实例就是自身Phaser实例,即当前Phaser实例没有父Phaser,获取当前Phaser实例的属性state值(terminated、phase、parties、unarrived),否则调用reconcileState()方法,reconcileState是调整当前Phaser实例的属性state中的phase值与root的一致
            long s = (root == this) ? state : reconcileState();
            //属性state高32位的低31位存放phase(Phaser实例的阶段值),属性state值无符号右移32位获取到phase值
            int phase = (int)(s >>> PHASE_SHIFT);
            //如果phase值小于0,表示属性state的最高位为1,属性state最高位存放terminated,即Phaser是否关闭,如果terminated为1,Phaser关闭
            if (phase < 0)
                //如果Phaser关闭,直接返回phase
                return phase;
            //获取state属性值的低32位,低32位的高16位存放parties值,低16位存放unarrived值
            int counts = (int)s;
            //如果counts(高16位存放parties值,低16位存放unarrived值)等于EMPTY,即当前Phaser实例还未注册参与者,否则的话counts & UNARRIVED_MASK获取到低16位存放unarrived值 
            int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
            //如果unarrived小于等于0,表明还没注册一个参与者,就有参与者到达阶段屏障,抛出IllegalStateException异常,badArrive看下面介绍
            if (unarrived <= 0)
                throw new IllegalStateException(badArrive(s));
            //使用CAS更新当前Phaser实例的属性state值,即属性state值更改为减去调整值的新值
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
                //如果当前参与者是最后一个到达阶段屏障
                if (unarrived == 1) {
                    //获取到属性state的parties值,做为unarrived的值
                    long n = s & PARTIES_MASK;  // base of next state
                    //将parties值左移16位做为下一阶段的unarrived的值
                    int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                    //如果当前Phaser实例没有层级,即根root Phaser实例为自身Phaser实例  
                    if (root == this) {
                        //调用onAdvance方法,判断当前Phaser实例是否需要关闭,即当前Phaser实例阶段是否都执行完,这个onAdvance方法应该被子类重写,内部可以自定义各个不同阶段执行的方法,比如switch{0:阶段0,1:阶段1,2:阶段2},如果返回true表明当前Phaser实例需要关闭
                        if (onAdvance(phase, nextUnarrived))
                            //如果onAdvance返回true将属性state值的最高位置为1
                            n |= TERMINATION_BIT;
                        else if (nextUnarrived == 0)
                            //如果下一阶段的unarrived的值为0,将属性state值的低32位置为EMPTY
                            n |= EMPTY;
                        else
                            //否则的话,将属性state值的低16位unarrived置为nextUnarrived 
                            n |= nextUnarrived;
                        //将phase阶段值加1做为下一阶段值
                        int nextPhase = (phase + 1) & MAX_PHASE;
                        //重新计算属性state值,将下一阶段值左移32位,或(|)上低32位的parties值和unarrived值
                        n |= (long)nextPhase << PHASE_SHIFT;
                        //使用UnSafe的cas更新当前Phaser实例的属性state  
                        UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
                        //唤醒所有其他等待其他参与者都到达阶段屏障的参与者线程,releaseWaiters方法看下面介绍
                        releaseWaiters(phase);
                    }
                    //当前Phaser实例有层级,即有父Phaser,如果nextUnarrived等于0,即当前Phaser实例没有参与者
                    else if (nextUnarrived == 0) { // propagate deregistration
                        //由于当前Phaser实例做为父Phaser实例的一个parties参与者,当前Phaser实例没有参与者,需将当前Phaser实例做为参与者从父Phaser实例中注销掉
                        phase = parent.doArrive(ONE_DEREGISTER);
                        //使用UnSafe的cas将属性state值的低32位置为EMPTY
                        UNSAFE.compareAndSwapLong(this, stateOffset,
                                                  s, s | EMPTY);
                    }
                    //当前Phaser实例有层级,即有父Phaser,并且nextUnarrived不等于0,即当前Phaser实例有参与者,由于当前Phaser实例做为父Phaser实例的一个parties参与者,为此当前Phaser实例的一个阶段屏障参与者都到达,当前Phaser实例做为父Phaser实例的一个参与者到达
                    else
                        //当前Phaser实例做为父Phaser的一个参与者到达
                        phase = parent.doArrive(ONE_ARRIVAL);
                }
                //返回当前所处的阶段值 
                return phase;
            }
        }
}

//如果unarrived小于等于0,表明还没注册一个参与者,就有参与者到达阶段屏障
private String badArrive(long s) {
        return "Attempted arrival of unregistered party for " +
            stateToString(s);
}

//控制Phaser实例是否继续执行下一阶段,可以自定义子类继承Phaser控制Phaser需要执行几个阶段,否则registeredParties不等于0,Phaser会一直执行下去
protected boolean onAdvance(int phase, int registeredParties) {
        //如果registeredParties等于0,Phaser实例需要关闭
        return registeredParties == 0;
}

/**
 * 唤醒所有其他等待其他参与者都到达阶段屏障的节点(参与者)线程
 * 
 * @param phase Phaser实例阶段值
 */
private void releaseWaiters(int phase) {
        //等待节点链表的第一个节点元素 
        QNode q;   // first element of queue
        //节点对应的线程
        Thread t;  // its thread
        //根据传入进来的阶段值获取操作的是偶链表还是奇链表
        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
        //循环的获取头节点,头节点不为空并且节点的阶段值和根的阶段值不相等,表明所有的参与者都到达传入进来的阶段屏障
        while ((q = head.get()) != null &&
               q.phase != (int)(root.state >>> PHASE_SHIFT)) {
            //CAS将头节点设置为头节点的下一节点,如果头节点设置为下一节点成功,并且头节点的线程不为空
            if (head.compareAndSet(q, q.next) &&
                (t = q.thread) != null) {
                //将头节点的线程置为空
                q.thread = null;
                //唤醒头节点线程,重新循环,直到链表为空或节点的阶段值和根的阶段值相等
                LockSupport.unpark(t);
            }
        }
}

/**
 * 一个参与者到达阶段屏障并且从屏障中注销移除,属性state中的parties和unarrived值都需要减1,如果是最后一个到达还需唤醒其他等待其他参与者都到达阶段屏障的参与者线程,如果一个参与者从屏障移除,并且屏障没有其他参与者即属性state值的parties等于0,将属性state的低32位置成EMPTY
 */
public int arriveAndDeregister() {
        //调用上面介绍的doArrive方法,将属性state中的unarrived做减1操作,如果是最后一个到达还需唤醒其他等待其他参与者都到达阶段屏障的参与者线程,并且屏障没有其他参与者即属性state值的parties等于0,将属性state的低32位置成EMPTY
        return doArrive(ONE_DEREGISTER);
}
复制代码

七、等待阶段屏障其余参与者线程全部到达

/**
 * 一个参与者到达阶段屏障并且等待其他参与者到达阶段屏障
 */
public int arriveAndAwaitAdvance() {
        //获取根root Phaser实例
        final Phaser root = this.root;
        //循环直到其他参与者都到达阶段屏障
        for (;;) {
            //如果根root Phaser实例就是自身Phaser实例,即当前Phaser实例不支持层级没有父Phaser,获取当前Phaser实例的属性state值(terminated、phase、parties、unarrived),否则调用reconcileState()方法,reconcileState是调整当前Phaser实例的属性state中的phase值与root的一致 
            long s = (root == this) ? state : reconcileState();
            //属性state高32位的低31位存放phase(Phaser实例的阶段值),属性state值无符号右移32位获取到phase值 
            int phase = (int)(s >>> PHASE_SHIFT);
            //如果phase值小于0,表示属性state的最高位为1,属性state最高位存放terminated,即Phaser是否关闭,如果terminated为1,Phaser关闭
            if (phase < 0)
                //如果Phaser关闭,直接返回phase
                return phase;
            //获取state属性值的低32位,低32位的高16位存放parties值,低16位存放unarrived值
            int counts = (int)s;
            //如果counts(高16位存放parties值,低16位存放unarrived值)等于EMPTY,即当前Phaser实例还未注册参与者,否则的话counts & UNARRIVED_MASK获取到低16位存放unarrived值 
            int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
            //如果unarrived小于等于0,表明还没注册一个参与者,就有参与者到达阶段屏障,抛出IllegalStateException异常,badArrive看上面介绍
            if (unarrived <= 0)
                throw new IllegalStateException(badArrive(s));
            //使用UnSafe的cas将属性state更新成新的值,即属性state中的unarrived值-1,如果更新失败重新循环 
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
                                          s -= ONE_ARRIVAL)) {
                //如果unarrived大于1,表明还有其他参与者还未到达阶段屏障
                if (unarrived > 1)
                    //根root Phser实例(如果没有层级root就是自身Phaser实例)调用下面的internalAwaitAdvance方法,让当前线程先自旋一段时间,如果自旋一段时间其余参与者还未到达阶段屏障,封装成节点,加入到等待链表中,等待被唤醒,可以看下面对此方法的介绍
                    return root.internalAwaitAdvance(phase, null);
                //如果根root Phaser不是自身Phaser实例,表明当前Phaser实例支持层级,当前Phaser实例做为父Phaser实例的一个参与者注册进去,为此当前Phaser实例(每个子的内部也有多个parties线程,也有多阶段))的一个阶段执行完成,做为父Phaser的一个参与者,表明当前参与者到达父的一个阶段屏障
                if (root != this)
                    //父Phaser把每个子的Phaser(每个子的内部也有多个parties线程,也有多阶段)当做父Phaser的一个parties参与者,为此当子Phaser实例的一个阶段执行完成表明子Phaser实例参与者也到达父Phaser实例的一个阶段
                    return parent.arriveAndAwaitAdvance();
                //获取到属性state的parties值,做为unarrived的值 
                long n = s & PARTIES_MASK;  // base of next state
                //将parties值左移16位做为下一阶段的unarrived的值
                int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                //调用onAdvance方法,判断当前Phaser实例是否需要关闭,即当前Phaser实例阶段是否都执行完,这个onAdvance方法应该被子类重写,内部可以自定义各个不同阶段执行的方法,比如switch{0:阶段0,1:阶段1,2:阶段2},如果返回true表明当前Phaser实例需要关闭
                if (onAdvance(phase, nextUnarrived))
                    //如果onAdvance返回true将属性state值的最高位置为1,关闭Phaser实例
                    n |= TERMINATION_BIT;
                else if (nextUnarrived == 0)
                    //如果下一阶段的unarrived的值为0,将属性state值的低32位置为EMPTY 
                    n |= EMPTY;
                else
                    //否则的话,将属性state值的低16位unarrived置为nextUnarrived   
                    n |= nextUnarrived;
                //将phase阶段值加1做为下一阶段值
                int nextPhase = (phase + 1) & MAX_PHASE;
                //重新计算属性state值,将下一阶段值左移32位,或(|)上低32位的parties值和unarrived值  
                n |= (long)nextPhase << PHASE_SHIFT;
                //使用UnSafe的cas更新当前Phaser实例的属性state,如果更新失败,表明有其他线程调用关闭Phaser实例,因为CAS更新属性state值只有最后一个参与者到达阶段屏障才会执行
                if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
                    return (int)(state >>> PHASE_SHIFT); // terminated
                //唤醒所有其他等待其他参与者都到达阶段屏障的参与者线程,releaseWaiters方法看上面介绍               
                releaseWaiters(phase);
                //返回下一阶段值 
                return nextPhase;
            }
        }
}

//让当前线程先自旋一段时间,如果自旋一段时间其余参与者还未到达阶段屏障,封装成节点,加入到等待链表中,等待被唤醒
private int internalAwaitAdvance(int phase, QNode node) {
        // assert root == this;
        ////唤醒所有其他等待其他参与者都到达传入阶段的上一阶段屏障的参与者线程
        releaseWaiters(phase-1);          // ensure old queue clean
        //节点是否加入到链表的标志位,true为节点已加入到链表中
        boolean queued = false;           // true when node is enqueued
        //记录最近的unarrived值,即最近还有几个参与者还未到达阶段屏障值
        int lastUnarrived = 0;            // to increase spins upon change
        //线程自旋的次数
        int spins = SPINS_PER_ARRIVAL;
        long s;
        int p;
        //循环,直到传入进来的阶段值和当前Phaser实例的阶段值不相等,表明最后一个参与者已经到达阶段屏障,其余等待参与者线程需被唤醒 
        while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
            //如果传入的节点为空
            if (node == null) {           // spinning in noninterruptible mode
                //从属性state中获取到低16位的unarrived值
                int unarrived = (int)s & UNARRIVED_MASK;
                //如果unarrived值和lastUnarrived值不相等,表明又有参与者到达阶段屏障
                if (unarrived != lastUnarrived &&
                    //如果unarrived小于NCPU,即还未到达的阶段屏障的参与者数目小于可用的处理器个数
                    (lastUnarrived = unarrived) < NCPU)
                    //把自旋值再加上SPINS_PER_ARRIVAL
                    spins += SPINS_PER_ARRIVAL;
                //获取当前线程是否被中断 
                boolean interrupted = Thread.interrupted();
                //如果线程被中断,或者已经自旋完
                if (interrupted || --spins < 0) { // need node to record intr
                    //创建新的节点,QNode可以看上面的介绍
                    node = new QNode(this, phase, false, false, 0L);
                    //将线程是否被中断标志位赋值给节点
                    node.wasInterrupted = interrupted;
                }
            }
            //判断节点是否被释放,如节点线程被中断,或者等待其他参与者到达超时,节点的isReleasable方法可以看上面介绍
            else if (node.isReleasable()) // done or aborted
                //如果节点被释放,退出循环
                break;
            //如果节点还未加入到链表中,将新建节点加入到链表中 
            else if (!queued) {           // push onto queue
                //根据传入进来的阶段值获取操作的是偶链表还是奇链表
                AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
                //获取头节点,做为新建节点的下一节点,新建节点做为链表新的头节点
                QNode q = node.next = head.get();
                //如果头节点为空空链表,或者节点的阶段值和传入进来的阶段值相等,并且传入进来的阶段值和当前Phaser实例的阶段值相等
                if ((q == null || q.phase == phase) &&
                    (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
                    //使用CAS将新建节点做为头节点
                    queued = head.compareAndSet(q, node);
            }
            else {
                try {
                    //使用ForkJoinPool的managedBlock方法让节点node对应的线程进入等待状态 
                    ForkJoinPool.managedBlock(node);
                } catch (InterruptedException ie) {
                    //节点线程在等待其余参与者到达阶段屏障的过程中被其他线程中断,将节点标识线程是否被中断wasInterrupted 置为true
                    node.wasInterrupted = true;
                }
            }
        }
        
        //如果节点不为空     
        if (node != null) {
            //如果节点线程不为空,表明线程不是被正常唤醒,有可能是被中断唤醒
            if (node.thread != null)
                //将当前节点线程置为空 
                node.thread = null;       // avoid need for unpark()
            //节点线程在等待其余参与者到达阶段屏障的过程中被其他线程中断,并且节点不支持线程中断,由于线程在被其他线程中断唤醒,抛出InterruptedException异常,线程的中断标志位被重置
            if (node.wasInterrupted && !node.interruptible)
                //线程在被其他线程中断唤醒,抛出InterruptedException异常,线程的中断标志位被重置,为此当前线程需要再次调用interrupt()保留中断标志位
                Thread.currentThread().interrupt();
            //如果线程不是被最后一个参与者到达阶段屏障唤醒,即当前Phaser实例的阶段值和传入的阶段值相等
            if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
                //将唤醒和当前Phaser实例阶段值不相等的节点,abortWait看下面方法的介绍
                return abortWait(phase); // possibly clean up on abort
        }
        //唤醒所有其他等待其他参与者都到达阶段屏障的参与者线程,releaseWaiters方法看上面介绍 
        releaseWaiters(phase);
        //返回传入的阶段值 
        return p;
}

//ForkJoinPool的managedBlock方法,阻塞线程,直到线程被唤醒
public static void managedBlock(ManagedBlocker blocker)
        throws InterruptedException {
        ForkJoinPool p;
        ForkJoinWorkerThread wt;
        Thread t = Thread.currentThread();
        //if这部分等分析ForkJoinPool再解析
        if ((t instanceof ForkJoinWorkerThread) &&
            (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
            WorkQueue w = wt.workQueue;
            while (!blocker.isReleasable()) {
                if (p.tryCompensate(w)) {
                    try {
                        do {} while (!blocker.isReleasable() &&
                                     !blocker.block());
                    } finally {
                        U.getAndAddLong(p, CTL, AC_UNIT);
                    }
                    break;
                }
            }
        }
        else {
            //循环调用节点QNode的isReleasable判断节点是否被释放,QNode的block方法让线程进入等待状态,不清楚的可以看QNode内部的这两个方法介绍  
            do {} while (!blocker.isReleasable() &&
                         !blocker.block());
        }
}

//传入阶段值,唤醒和当前Phaser实例阶段值不相等的节点
private int abortWait(int phase) {
        //根据传入进来的阶段值获取操作的是偶链表还是奇链表 
        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
        //循环,直到唤醒所有等待在传入进来的phase阶段值的节点线程 
        for (;;) {
            Thread t;
            //获取链表头节点 
            QNode q = head.get();
            //root Phaser实例state属性高32位的低31位存放phase,为此需要右移32位得到phase值
            int p = (int)(root.state >>> PHASE_SHIFT);
            //如果链表中没有等待节点,或者头节点线程不为空和节点阶段值和当前Phaser实例阶段值相等,直接返回+
            if (q == null || ((t = q.thread) != null && q.phase == p))
                return p;
            if (head.compareAndSet(q, q.next) && t != null) {
                q.thread = null;
                LockSupport.unpark(t);
            }
        }
}

public int awaitAdvance(int phase) {
        final Phaser root = this.root;
        long s = (root == this) ? state : reconcileState();
        int p = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            return phase;
        if (p == phase)
            return root.internalAwaitAdvance(phase, null);
        return p;
}

public int awaitAdvanceInterruptibly(int phase)
        throws InterruptedException {
        final Phaser root = this.root;
        long s = (root == this) ? state : reconcileState();
        int p = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            return phase;
        if (p == phase) {
            QNode node = new QNode(this, phase, true, false, 0L);
            p = root.internalAwaitAdvance(phase, node);
            if (node.wasInterrupted)
                throw new InterruptedException();
        }
        return p;
}

public int awaitAdvanceInterruptibly(int phase,
                                         long timeout, TimeUnit unit)
        throws InterruptedException, TimeoutException {
        long nanos = unit.toNanos(timeout);
        final Phaser root = this.root;
        long s = (root == this) ? state : reconcileState();
        int p = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            return phase;
        if (p == phase) {
            QNode node = new QNode(this, phase, true, true, nanos);
            p = root.internalAwaitAdvance(phase, node);
            if (node.wasInterrupted)
                throw new InterruptedException();
            else if (p == phase)
                throw new TimeoutException();
        }
        return p;
}复制代码

八、关闭屏障

public void forceTermination() {
        // Only need to change root state
        final Phaser root = this.root;
        long s;
        while ((s = root.state) >= 0) {
            if (UNSAFE.compareAndSwapLong(root, stateOffset,
                                          s, s | TERMINATION_BIT)) {
                // signal all threads
                releaseWaiters(0); // Waiters on evenQ
                releaseWaiters(1); // Waiters on oddQ
                return;
            }
        }
}复制代码

九、其他方法

public final int getPhase() {
        return (int)(root.state >>> PHASE_SHIFT);
}

public int getRegisteredParties() {
        return partiesOf(state);
}

public int getArrivedParties() {
        return arrivedOf(reconcileState());
}

public int getUnarrivedParties() {
        return unarrivedOf(reconcileState());
}

public Phaser getParent() {
        return parent;
}

public Phaser getRoot() {
        return root;
}

public boolean isTerminated() {
        return root.state < 0L;
}

public String toString() {
        return stateToString(reconcileState());
}复制代码


关注下面的标签,发现更多相似文章
评论