第4章 同步工具类

除了锁与Condition,Concurrent 包还提供了一系列同步工具类。这些同步工具类的原理,有些也是基于AQS的,有些则需要特殊的实现机制。

Semaphore

Semaphore也就是信号量,提供了资源数量的并发访问控制,其使用代码很简单,如下所示。

		// 初始有10个共享资源;第二个参数是是否公平选项
		Semaphore semaphore = new Semaphore(10, true);

        try {
            // 每次获取一个资源,获取不到,则线程就会阻塞
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 用完释放
            semaphore.release();
        }

如图所示,假设有n个线程来获取Semaphore里面的资源(n > 10),n个线程中只有10个线程能获取到,其他线程都会阻塞。直到有线程释放了资源,其他线程才能获取到。

多个线程访问Semaphore示意图

当初始的资源个数为1的时候,Semaphore退化为排他锁。正因为如此,Semaphone的实现原理和锁十分类似,是基于AQS,有公平和非公平之分。Semaphore相关类的继承体系如图所示。

Semaphore相关类的继承体系

public class Semaphore implements java.io.Serializable {
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }
    
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }
}

Semaphore和锁的实现原理基本相同,资源总数即state的初始值,在acquire里对state变量进行CAS减操作,减到0之后,线程阻塞;在release里对state变量进行CAS加操作。

CountDownLatch

CountDownLatch使用场景

考虑一个场景:一个主线程要等待10个Worker 线程工作完毕才退出,就能使用CountDownLatch来实现。

		CountDownLatch doneSignal = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                // do something
                doneSignal.countDown();
            }).start();
        }
        try {
            doneSignal.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

如图所示为CountDownLatch相关类的继承层次,CountDownLatch原理和Semaphore原理类似,同样是基于AQS,不过没有公平和非公平之分。

CountDownLatch相关类的继承层次

await()实现分析

如下所示,await()调用的是AQS 的模板方法,这个方法在前面已经介绍过。CountDownLatch.Sync重新实现了tryAccuqireShared方法。

public class CountDownLatch {
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    private static final class Sync extends AbstractQueuedSynchronizer {
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
    }
}
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
}

从tryAcquireShared(..)方法的实现来看,只要state!=0,调用await()方法的线程便会被放入AQS的阻塞队列,进入阻塞状态。

countDown()实现分析

public class CountDownLatch {
    public void countDown() {
        sync.releaseShared(1);
    }
    
    private static final class Sync extends AbstractQueuedSynchronizer {
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
}
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
}

countDown()调用的AQS 的模板方法releaseShared(),里面的tryReleaseShared(..)被CountDownLatch.Sync重新实现。从上面的代码可以看出,只有state=0,tryReleaseShared(..)才会返回true,然后执行doReleaseShared(..),一次性唤醒队列中所有阻塞的线程。

最后做一下小结:因为是基于AQS 阻塞队列来实现的,所以可以让多个线程都阻塞在state=0条件上,通过countDown()一直累减state,减到0后一次性唤醒所有线程。如图所示,假设初始总数为M,N个线程await(),M个线程countDown(),减到0之后,N个线程被唤醒。

多个线程阻塞在await()示意图

CyclicBarrier

CyclicBarrier使用场景

CyclicBarrier使用代码也很简单,如下所示。

		CyclicBarrier cyclicBarrier = new CyclicBarrier(10);

        try {
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

考虑这样一个场景:10个工程师一起来公司应聘,招聘方式分为笔试和面试。首先,要等人到齐后,开始笔试;笔试结束之后,再一起参加面试。把10个人看作10个线程,10个线程之间的同步过程如图4-5所示。

10个线程之间的同步过程

在整个过程中,有2个同步点:第1个同步点,要等所有应聘者都到达公司,再一起开始笔试;第2个同步点,要等所有应聘者都结束笔试,之后一起进入面试环节。具体到每个线程的run()方法中,就是下面的伪代码:

image-20220311101714726

CyclicBarrier实现原理

CyclicBarrier基于ReentrantLock+Condition实现。

public class CyclicBarrier {
    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();

    /**
     * Number of parties still waiting. Counts down from parties to 0
     * on each generation.  It is reset to parties on each new
     * generation or when broken.
     */
    private int count;
}

先从构造函数说起,可以看到,不仅可以传入参与方的总数量,还可以传入一个回调函数。当所有的线程被唤醒时,barrierAction被执行。

	public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

接下来看一下await()函数的实现过程。

public class CyclicBarrier {
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    
    /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * Sets current barrier generation as broken and wakes up everyone.
     * Called only while holding lock.
     */
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
}

关于上面的函数,有几点说明:

  1. CyclicBarrier是可以被重用的。以上一节的应聘场景为例,来了10个线程,这10个线程互相等待,到齐后一起被唤醒,各自执行接下来的逻辑;然后,这10个线程继续互相等待,到齐后再一起被唤醒。每一轮被称为一个Generation,就是一次同步点。
  2. CyclicBarrier 会响应中断。10个线程没有到齐,如果有线程收到了中断信号,所有阻塞的线程也会被唤醒,就是上面的breakBarrier()函数。然后count被重置为初始值(parties),重新开始。
  3. 上面的回调函数,barrierAction只会被第10个线程执行1次(在唤醒其他9个线程之前),而不是10个线程每个都执行1次。

Exchanger

使用场景

Exchanger用于线程之间交换数据,其使用代码很简单,是一个exchange(..)函数,使用示例如下:

image-20220311102710225

在上面的例子中,4个线程并发地调用exchange(..),会两两交互数据,如A/B、C/D、A/C、B/D、A/D或B/C。

实现原理(TODO)

TODO

Phaser

用Phaser替代CyclicBarrier和CountDownLatch

从JDK7开始,新增了一个同步工具类Phaser,其功能比CyclicBarrier和CountDownLatch更加强大。

用Phaser替代CountDownLatch

考虑讲CountDownLatch时的例子,1个主线程要等10个worker线程完成之后,才能做接下来的事情,也可以用Phaser来实现此功能。在CountDownLatch中,主要是2个函数:await()和countDown(),在Phaser中,与之相对应的函数是awaitAdance(int n)和arrive()。

		Phaser phaser = new Phaser(10);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                // do something
                phaser.arrive();
            }).start();
        }
        phaser.awaitAdvance(phaser.getPhase());

用Phaser替代CyclicBarrier

考虑前面讲CyclicBarrier时,10个工程师去公司应聘的例子,也可以用Phaser实现,代码基本类似。

image-20220311103140237

arriveAndAwaitAdance()就是arrive()与awaitAdvance(int)的组合,表示“我自己已到达这个同步点,同时要等待所有人都到达这个同步点,然后再一起前行”。

Phaser新特性

特性1:动态调整线程个数

CyclicBarrier 所要同步的线程个数是在构造函数中指定的,之后不能更改,而Phaser 可以在运行期间动态地调整要同步的线程个数。Phaser 提供了下面这些函数来增加、减少所要同步的线程个数。

public class Phaser {
    public int register() {
        return doRegister(1);
    }
    
    public int bulkRegister(int parties) {
        if (parties < 0)
            throw new IllegalArgumentException();
        if (parties == 0)
            return getPhase();
        return doRegister(parties);
    }
    
    public int arriveAndDeregister() {
        return doArrive(ONE_DEREGISTER);
    }
}

层次Phaser

多个Phaser可以组成如图所示的树状结构,可以通过在构造函数中传入父Phaser来实现。

树状的Phaser

先简单看一下Phaser内部关于树状结构的存储,如下面代码所示。

public class Phaser {
    /**
     * The parent of this phaser, or null if none
     */
    private final Phaser parent;
}

可以发现,在Phaser的内部结构中,每个Phaser记录了自己的父节点,但并没有记录自己的子节点列表。所以,每个Phaser知道自己的父节点是谁,但父节点并不知道自己有多少个子节点,对父节点的操作,是通过子节点来实现的。

树状的Phaser怎么使用呢?考虑如下代码,会组成如图所示的树状Phaser。

		Phaser root = new Phaser(2);
        Phaser c1 = new Phaser(root, 3);
        Phaser c2 = new Phaser(root, 2);
        Phaser c3 = new Phaser(c1, 0);

代码组成的树状Phaser

本来root有两个参与者,然后为其加入了两个子Phaser(c1,c2),每个子Phaser会算作1个参与者,root的参与者就变成2+2=4个。c1本来有3个参与者,为其加入了一个子Phaserc3,参与者数量变成3+1=4个。c3的参与者初始为0,后续可以通过调用register()函数加入。

对于树状Phaser上的每个节点来说,可以当作一个独立的Phaser来看待,其运作机制和一个单独的Phaser是一样的。具体来讲:父Phaser并不用感知子Phaser的存在,当子Phaser中注册的参与者数量大于0时,会把自己向父节点注册;当子Phaser中注册的参与者数量等于0时,会自动向父节点解注册。父Phaser把子Phaser当作一个正常参与的线程就可以了。

state变量解析

大致了解了Phaser的用法和新特性之后,下面仔细剖析其实现原理。Phaser没有基于AQS来实现,但具备AQS的核心特性:state变量、CAS操作、阻塞队列。先从state变量说起。

public class Phaser {
    /**
     * Primary state representation, holding four bit-fields:
     *
     * unarrived  -- the number of parties yet to hit barrier (bits  0-15)
     * parties    -- the number of parties to wait            (bits 16-31)
     * phase      -- the generation of the barrier            (bits 32-62)
     * terminated -- set if barrier is terminated             (bit  63 / sign)
     *
     * Except that a phaser with no registered parties is
     * distinguished by the otherwise illegal state of having zero
     * parties and one unarrived parties (encoded as EMPTY below).
     *
     * To efficiently maintain atomicity, these values are packed into
     * a single (atomic) long. Good performance relies on keeping
     * state decoding and encoding simple, and keeping race windows
     * short.
     *
     * All state updates are performed via CAS except initial
     * registration of a sub-phaser (i.e., one with a non-null
     * parent).  In this (relatively rare) case, we use built-in
     * synchronization to lock while first registering with its
     * parent.
     *
     * The phase of a subphaser is allowed to lag that of its
     * ancestors until it is actually accessed -- see method
     * reconcileState.
     */
    private volatile long state;
}

这个64位的state变量被拆成4部分,如图4-9所示为state变量各部分表示的意思。

state变量各部分表示的意思

最高位0表示未同步完成,1表示同步完成,初始最高位为0。

Phaser提供了一系列的成员函数来从state中获取图中的几个数字,如下所示。

public class Phaser {
    private static final int  PHASE_SHIFT     = 32;
    public final int getPhase() {
        return (int)(root.state >>> PHASE_SHIFT);
    }
    
    public boolean isTerminated() {
        return root.state < 0L;
    }
    
    public int getRegisteredParties() {
        return partiesOf(state);
    }
    private static final int  PARTIES_SHIFT   = 16;
    private static int partiesOf(long s) {
        return (int)s >>> PARTIES_SHIFT;
    }
    
    public int getUnarrivedParties() {
        return unarrivedOf(reconcileState());
    }
    private static final int  UNARRIVED_MASK  = 0xffff;
    private static int unarrivedOf(long s) {
        int counts = (int)s;
        return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
    }
}

下面再看一下state变量在构造函数中是如何被赋值的:

public class Phaser {
    public Phaser(Phaser parent, int parties) {
        if (parties >>> PARTIES_SHIFT != 0)
            throw new IllegalArgumentException("Illegal number of parties");
        int phase = 0;
        this.parent = parent;
        if (parent != null) {
            final Phaser root = parent.root;
            this.root = root;
            this.evenQ = root.evenQ;
            this.oddQ = root.oddQ;
            if (parties != 0)
                phase = parent.doRegister(1);
        }
        else {
            this.root = this;
            this.evenQ = new AtomicReference<QNode>();
            this.oddQ = new AtomicReference<QNode>();
        }
        this.state = (parties == 0) ? (long)EMPTY :
            ((long)phase << PHASE_SHIFT) |
            ((long)parties << PARTIES_SHIFT) |
            ((long)parties);
    }
}	

当parties=0时,state被赋予一个EMPTY常量,常量为1;

当parties!=0时,把phase值左移32位;把parties左移16位;然后parties也作为最低的16位,3个值做或操作,赋值给state。这个赋值操作也反映了上图的意思。

阻塞与唤醒(Treiber Stack)

基于上述的state变量,对其执行CAS操作,并进行相应的阻塞与唤醒。如图所示,右边的主线程会调用awaitAdvance()进行阻塞;左边的arrive()会对state进行CAS的累减操作,当未到达的线程数减到0时,唤醒右边阻塞的主线程。

基于state的CAS的阻塞与唤醒示意图

在这里,阻塞使用的是一个称为Treiber Stack的数据结构,而不是AQS的双向链表。Treiber Stack是一个无锁的栈,由R.Kent Treiber在其于1986年发表的论文SystemsProgramming:Coping with Para llelism 中首次提出。它是一个单向链表,出栈、入栈都在链表头部,所以只需要一个head指针,而不需要tail指针。实现代码如下所示。

public class Phaser {
    /**
     * Wait nodes for Treiber stack representing wait queue
     */
    static final class QNode implements ForkJoinPool.ManagedBlocker {
        final Phaser phaser;
        final int phase;
        final boolean interruptible;
        final boolean timed;
        boolean wasInterrupted;
        long nanos;
        final long deadline;
        volatile Thread thread; // nulled to cancel wait
        QNode next;
    }

    /**
     * Heads of Treiber stacks for waiting threads. To eliminate
     * contention when releasing some threads while adding others, we
     * use two of them, alternating across even and odd phases.
     * Subphasers share queues with root to speed up releases.
     */
    private final AtomicReference<QNode> evenQ;
    private final AtomicReference<QNode> oddQ;
}	

为了减少并发冲突,这里定义了2个链表,也就是2个TreiberStack。当phase为奇数轮的时候,阻塞线程放在oddQ里面;当phase为偶数轮的时候,阻塞线程放在evenQ里面。代码如下所示。

public class Phaser {
    private AtomicReference<QNode> queueFor(int phase) {
        return ((phase & 1) == 0) ? evenQ : oddQ;
    }
}	

arrive()函数分析

下面看arrive()函数是如何对state变量进行操作,又是如何唤醒线程的。

public class Phaser {
    public int arrive() {
        return doArrive(ONE_ARRIVAL);
    }
    
    public int arriveAndDeregister() {
        return doArrive(ONE_DEREGISTER);
    }
}	

arrive()和arriveAndDeregister()内部调用的都是doArrive(boolean)函数。区别在于前者只是把“未达到线程数”减1;后者则把“未到达线程数”和“下一轮的总线程数”都减1。下面看一下doArrive(boolean)函数的实现。

public class Phaser {
    /**
     * Main implementation for methods arrive and arriveAndDeregister.
     * Manually tuned to speed up and minimize race windows for the
     * common case of just decrementing unarrived field.
     *
     * @param adjust value to subtract from state;
     *               ONE_ARRIVAL for arrive,
     *               ONE_DEREGISTER for arriveAndDeregister
     */
    private int doArrive(int adjust) {
        final Phaser root = this.root;
        for (;;) {
            long s = (root == this) ? state : reconcileState();
            int phase = (int)(s >>> PHASE_SHIFT);
            if (phase < 0)
                return phase;
            int counts = (int)s;
            int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
            if (unarrived <= 0)
                throw new IllegalStateException(badArrive(s));
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
                if (unarrived == 1) {
                    long n = s & PARTIES_MASK;  // base of next state
                    int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                    if (root == this) {
                        if (onAdvance(phase, nextUnarrived))
                            n |= TERMINATION_BIT;
                        else if (nextUnarrived == 0)
                            n |= EMPTY;
                        else
                            n |= nextUnarrived;
                        int nextPhase = (phase + 1) & MAX_PHASE;
                        n |= (long)nextPhase << PHASE_SHIFT;
                        UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
                        releaseWaiters(phase);
                    }
                    else if (nextUnarrived == 0) { // propagate deregistration
                        phase = parent.doArrive(ONE_DEREGISTER);
                        UNSAFE.compareAndSwapLong(this, stateOffset,
                                                  s, s | EMPTY);
                    }
                    else
                        phase = parent.doArrive(ONE_ARRIVAL);
                }
                return phase;
            }
        }
    }
}	

关于上面的函数,有以下几点说明:

  1. 定义了2个常量如下。当deregister=false 时,只最低的16位需要减1,adj=ONE_ARRIVAL;当deregister=true时,低32位中的2个16位都需要减1,adj=ONE_ARRIVAL|ONE_PARTY。

    public class Phaser {
        private static final int  PARTIES_SHIFT   = 16;
    
        private static final int  ONE_ARRIVAL     = 1;
        private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
    }	
    
  2. 把未到达线程数减1。减了之后,如果还未到0,什么都不做,直接返回。如果到0,会做2件事情:第1,重置state,把state的未到达线程个数重置到总的注册的线程数中,同时phase加1;第2,唤醒队列中的线程。

下面看一下唤醒函数:

public class Phaser {
    /**
     * Removes and signals threads from queue for phase.
     */
    private void releaseWaiters(int phase) {
        QNode q;   // first element of queue
        Thread t;  // its thread
        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
        while ((q = head.get()) != null &&
               q.phase != (int)(root.state >>> PHASE_SHIFT)) {
            if (head.compareAndSet(q, q.next) &&
                (t = q.thread) != null) {
                q.thread = null;
                LockSupport.unpark(t);
            }
        }
    }
}	

遍历整个栈,只要栈当中节点的phase不等于当前Phaser的phase,说明该节点不是当前轮的,而是前一轮的,应该被释放并唤醒。

接下来看一下线程是如何被阻塞的。

awaitAdvance()函数分析

public class Phaser {
    /**
     * Awaits the phase of this phaser to advance from the given phase
     * value, returning immediately if the current phase is not equal
     * to the given phase value or this phaser is terminated.
     *
     * @param phase an arrival phase number, or negative value if
     * terminated; this argument is normally the value returned by a
     * previous call to {@code arrive} or {@code arriveAndDeregister}.
     * @return the next arrival phase number, or the argument if it is
     * negative, or the (negative) {@linkplain #getPhase() current phase}
     * if terminated
     */
    public int awaitAdvance(int phase) {
        final Phaser root = this.root;
        long s = (root == this) ? state : reconcileState();
        int p = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            return phase;
        if (p == phase)
            return root.internalAwaitAdvance(phase, null);
        return p;
    }
}	

下面的while循环中有4个分支:初始的时候,node==null,进入第1个分支进行自旋,自旋次数满足之后,会新建一个QNode节点;之后执行第3、第4个分支,分别把该节点入栈并阻塞。

public class Phaser {
    /**
     * Possibly blocks and waits for phase to advance unless aborted.
     * Call only on root phaser.
     *
     * @param phase current phase
     * @param node if non-null, the wait node to track interrupt and timeout;
     * if null, denotes noninterruptible wait
     * @return current phase
     */
    private int internalAwaitAdvance(int phase, QNode node) {
        // assert root == this;
        releaseWaiters(phase-1);          // ensure old queue clean
        boolean queued = false;           // true when node is enqueued
        int lastUnarrived = 0;            // to increase spins upon change
        int spins = SPINS_PER_ARRIVAL;
        long s;
        int p;
        while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
            if (node == null) {           // spinning in noninterruptible mode
                int unarrived = (int)s & UNARRIVED_MASK;
                if (unarrived != lastUnarrived &&
                    (lastUnarrived = unarrived) < NCPU)
                    spins += SPINS_PER_ARRIVAL;
                boolean interrupted = Thread.interrupted();
                if (interrupted || --spins < 0) { // need node to record intr
                    node = new QNode(this, phase, false, false, 0L);
                    node.wasInterrupted = interrupted;
                }
            }
            else if (node.isReleasable()) // done or aborted
                break;
            else if (!queued) {           // push onto queue
                AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
                QNode q = node.next = head.get();
                if ((q == null || q.phase == phase) &&
                    (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
                    queued = head.compareAndSet(q, node);
            }
            else {
                try {
                    ForkJoinPool.managedBlock(node);
                } catch (InterruptedException ie) {
                    node.wasInterrupted = true;
                }
            }
        }

        if (node != null) {
            if (node.thread != null)
                node.thread = null;       // avoid need for unpark()
            if (node.wasInterrupted && !node.interruptible)
                Thread.currentThread().interrupt();
            if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
                return abortWait(phase); // possibly clean up on abort
        }
        releaseWaiters(phase);
        return p;
    }
}	

这里调用了ForkJoinPool.managedBlock(ManagedBlockerblocker)函数,目的是把node对应的线程阻塞。ManagerdBlocker是ForkJoinPool里面的一个接口,定义如下:

public static interface ManagedBlocker {
	boolean block() throws InterruptedException;
    boolean isReleasable();
}

QNode实现了该接口,实现原理还是park(),如下所示。之所以没有直接使用park()/unpark()来实现阻塞、唤醒,而是封装了ManagedBlocker这一层,主要是出于使用上的方便考虑。一方面是park()可能被中断唤醒,另一方面是带超时时间的park(),把这二者都封装在一起。

public class Phaser {
    static final class QNode implements ForkJoinPool.ManagedBlocker {
    	public boolean isReleasable() {
            if (thread == null)
                return true;
            if (phaser.getPhase() != phase) {
                thread = null;
                return true;
            }
            if (Thread.interrupted())
                wasInterrupted = true;
            if (wasInterrupted && interruptible) {
                thread = null;
                return true;
            }
            if (timed) {
                if (nanos > 0L) {
                    nanos = deadline - System.nanoTime();
                }
                if (nanos <= 0L) {
                    thread = null;
                    return true;
                }
            }
            return false;
        }

        public boolean block() {
            if (isReleasable())
                return true;
            else if (!timed)
                LockSupport.park(this);
            else if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
            return isReleasable();
        }
    }
}	

理解了arrive()和awaitAdvance(),arriveAndAwaitAdvance()就是二者的一个组合版本,此处不再展开分析。