第4章 同步工具类
除了锁与Condition,Concurrent 包还提供了一系列同步工具类。这些同步工具类的原理,有些也是基于AQS的,有些则需要特殊的实现机制。
Semaphore
Semaphore也就是信号量,提供了资源数量的并发访问控制,其使用代码很简单,如下所示。
// 初始有10个共享资源;第二个参数是是否公平选项
Semaphore semaphore = new Semaphore(10, true);
try {
// 每次获取一个资源,获取不到,则线程就会阻塞
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 用完释放
semaphore.release();
}
如图所示,假设有n个线程来获取Semaphore里面的资源(n > 10),n个线程中只有10个线程能获取到,其他线程都会阻塞。直到有线程释放了资源,其他线程才能获取到。
当初始的资源个数为1的时候,Semaphore退化为排他锁。正因为如此,Semaphone的实现原理和锁十分类似,是基于AQS,有公平和非公平之分。Semaphore相关类的继承体系如图所示。
public class Semaphore implements java.io.Serializable {
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
}
Semaphore和锁的实现原理基本相同,资源总数即state的初始值,在acquire里对state变量进行CAS减操作,减到0之后,线程阻塞;在release里对state变量进行CAS加操作。
CountDownLatch
CountDownLatch使用场景
考虑一个场景:一个主线程要等待10个Worker 线程工作完毕才退出,就能使用CountDownLatch来实现。
CountDownLatch doneSignal = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
// do something
doneSignal.countDown();
}).start();
}
try {
doneSignal.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
如图所示为CountDownLatch相关类的继承层次,CountDownLatch原理和Semaphore原理类似,同样是基于AQS,不过没有公平和非公平之分。
await()实现分析
如下所示,await()调用的是AQS 的模板方法,这个方法在前面已经介绍过。CountDownLatch.Sync重新实现了tryAccuqireShared方法。
public class CountDownLatch {
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
private static final class Sync extends AbstractQueuedSynchronizer {
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
}
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
}
从tryAcquireShared(..)方法的实现来看,只要state!=0,调用await()方法的线程便会被放入AQS的阻塞队列,进入阻塞状态。
countDown()实现分析
public class CountDownLatch {
public void countDown() {
sync.releaseShared(1);
}
private static final class Sync extends AbstractQueuedSynchronizer {
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
}
countDown()调用的AQS 的模板方法releaseShared(),里面的tryReleaseShared(..)被CountDownLatch.Sync重新实现。从上面的代码可以看出,只有state=0,tryReleaseShared(..)才会返回true,然后执行doReleaseShared(..),一次性唤醒队列中所有阻塞的线程。
最后做一下小结:因为是基于AQS 阻塞队列来实现的,所以可以让多个线程都阻塞在state=0条件上,通过countDown()一直累减state,减到0后一次性唤醒所有线程。如图所示,假设初始总数为M,N个线程await(),M个线程countDown(),减到0之后,N个线程被唤醒。
CyclicBarrier
CyclicBarrier使用场景
CyclicBarrier使用代码也很简单,如下所示。
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
考虑这样一个场景:10个工程师一起来公司应聘,招聘方式分为笔试和面试。首先,要等人到齐后,开始笔试;笔试结束之后,再一起参加面试。把10个人看作10个线程,10个线程之间的同步过程如图4-5所示。
在整个过程中,有2个同步点:第1个同步点,要等所有应聘者都到达公司,再一起开始笔试;第2个同步点,要等所有应聘者都结束笔试,之后一起进入面试环节。具体到每个线程的run()方法中,就是下面的伪代码:
CyclicBarrier实现原理
CyclicBarrier基于ReentrantLock+Condition实现。
public class CyclicBarrier {
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;
}
先从构造函数说起,可以看到,不仅可以传入参与方的总数量,还可以传入一个回调函数。当所有的线程被唤醒时,barrierAction被执行。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
接下来看一下await()函数的实现过程。
public class CyclicBarrier {
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
}
关于上面的函数,有几点说明:
- CyclicBarrier是可以被重用的。以上一节的应聘场景为例,来了10个线程,这10个线程互相等待,到齐后一起被唤醒,各自执行接下来的逻辑;然后,这10个线程继续互相等待,到齐后再一起被唤醒。每一轮被称为一个Generation,就是一次同步点。
- CyclicBarrier 会响应中断。10个线程没有到齐,如果有线程收到了中断信号,所有阻塞的线程也会被唤醒,就是上面的breakBarrier()函数。然后count被重置为初始值(parties),重新开始。
- 上面的回调函数,barrierAction只会被第10个线程执行1次(在唤醒其他9个线程之前),而不是10个线程每个都执行1次。
Exchanger
使用场景
Exchanger用于线程之间交换数据,其使用代码很简单,是一个exchange(..)函数,使用示例如下:
在上面的例子中,4个线程并发地调用exchange(..),会两两交互数据,如A/B、C/D、A/C、B/D、A/D或B/C。
实现原理(TODO)
TODO
Phaser
用Phaser替代CyclicBarrier和CountDownLatch
从JDK7开始,新增了一个同步工具类Phaser,其功能比CyclicBarrier和CountDownLatch更加强大。
用Phaser替代CountDownLatch
考虑讲CountDownLatch时的例子,1个主线程要等10个worker线程完成之后,才能做接下来的事情,也可以用Phaser来实现此功能。在CountDownLatch中,主要是2个函数:await()和countDown(),在Phaser中,与之相对应的函数是awaitAdance(int n)和arrive()。
Phaser phaser = new Phaser(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
// do something
phaser.arrive();
}).start();
}
phaser.awaitAdvance(phaser.getPhase());
用Phaser替代CyclicBarrier
考虑前面讲CyclicBarrier时,10个工程师去公司应聘的例子,也可以用Phaser实现,代码基本类似。
arriveAndAwaitAdance()就是arrive()与awaitAdvance(int)的组合,表示“我自己已到达这个同步点,同时要等待所有人都到达这个同步点,然后再一起前行”。
Phaser新特性
特性1:动态调整线程个数
CyclicBarrier 所要同步的线程个数是在构造函数中指定的,之后不能更改,而Phaser 可以在运行期间动态地调整要同步的线程个数。Phaser 提供了下面这些函数来增加、减少所要同步的线程个数。
public class Phaser {
public int register() {
return doRegister(1);
}
public int bulkRegister(int parties) {
if (parties < 0)
throw new IllegalArgumentException();
if (parties == 0)
return getPhase();
return doRegister(parties);
}
public int arriveAndDeregister() {
return doArrive(ONE_DEREGISTER);
}
}
层次Phaser
多个Phaser可以组成如图所示的树状结构,可以通过在构造函数中传入父Phaser来实现。
先简单看一下Phaser内部关于树状结构的存储,如下面代码所示。
public class Phaser {
/**
* The parent of this phaser, or null if none
*/
private final Phaser parent;
}
可以发现,在Phaser的内部结构中,每个Phaser记录了自己的父节点,但并没有记录自己的子节点列表。所以,每个Phaser知道自己的父节点是谁,但父节点并不知道自己有多少个子节点,对父节点的操作,是通过子节点来实现的。
树状的Phaser怎么使用呢?考虑如下代码,会组成如图所示的树状Phaser。
Phaser root = new Phaser(2);
Phaser c1 = new Phaser(root, 3);
Phaser c2 = new Phaser(root, 2);
Phaser c3 = new Phaser(c1, 0);
本来root有两个参与者,然后为其加入了两个子Phaser(c1,c2),每个子Phaser会算作1个参与者,root的参与者就变成2+2=4个。c1本来有3个参与者,为其加入了一个子Phaserc3,参与者数量变成3+1=4个。c3的参与者初始为0,后续可以通过调用register()函数加入。
对于树状Phaser上的每个节点来说,可以当作一个独立的Phaser来看待,其运作机制和一个单独的Phaser是一样的。具体来讲:父Phaser并不用感知子Phaser的存在,当子Phaser中注册的参与者数量大于0时,会把自己向父节点注册;当子Phaser中注册的参与者数量等于0时,会自动向父节点解注册。父Phaser把子Phaser当作一个正常参与的线程就可以了。
state变量解析
大致了解了Phaser的用法和新特性之后,下面仔细剖析其实现原理。Phaser没有基于AQS来实现,但具备AQS的核心特性:state变量、CAS操作、阻塞队列。先从state变量说起。
public class Phaser {
/**
* Primary state representation, holding four bit-fields:
*
* unarrived -- the number of parties yet to hit barrier (bits 0-15)
* parties -- the number of parties to wait (bits 16-31)
* phase -- the generation of the barrier (bits 32-62)
* terminated -- set if barrier is terminated (bit 63 / sign)
*
* Except that a phaser with no registered parties is
* distinguished by the otherwise illegal state of having zero
* parties and one unarrived parties (encoded as EMPTY below).
*
* To efficiently maintain atomicity, these values are packed into
* a single (atomic) long. Good performance relies on keeping
* state decoding and encoding simple, and keeping race windows
* short.
*
* All state updates are performed via CAS except initial
* registration of a sub-phaser (i.e., one with a non-null
* parent). In this (relatively rare) case, we use built-in
* synchronization to lock while first registering with its
* parent.
*
* The phase of a subphaser is allowed to lag that of its
* ancestors until it is actually accessed -- see method
* reconcileState.
*/
private volatile long state;
}
这个64位的state变量被拆成4部分,如图4-9所示为state变量各部分表示的意思。
最高位0表示未同步完成,1表示同步完成,初始最高位为0。
Phaser提供了一系列的成员函数来从state中获取图中的几个数字,如下所示。
public class Phaser {
private static final int PHASE_SHIFT = 32;
public final int getPhase() {
return (int)(root.state >>> PHASE_SHIFT);
}
public boolean isTerminated() {
return root.state < 0L;
}
public int getRegisteredParties() {
return partiesOf(state);
}
private static final int PARTIES_SHIFT = 16;
private static int partiesOf(long s) {
return (int)s >>> PARTIES_SHIFT;
}
public int getUnarrivedParties() {
return unarrivedOf(reconcileState());
}
private static final int UNARRIVED_MASK = 0xffff;
private static int unarrivedOf(long s) {
int counts = (int)s;
return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}
}
下面再看一下state变量在构造函数中是如何被赋值的:
public class Phaser {
public Phaser(Phaser parent, int parties) {
if (parties >>> PARTIES_SHIFT != 0)
throw new IllegalArgumentException("Illegal number of parties");
int phase = 0;
this.parent = parent;
if (parent != null) {
final Phaser root = parent.root;
this.root = root;
this.evenQ = root.evenQ;
this.oddQ = root.oddQ;
if (parties != 0)
phase = parent.doRegister(1);
}
else {
this.root = this;
this.evenQ = new AtomicReference<QNode>();
this.oddQ = new AtomicReference<QNode>();
}
this.state = (parties == 0) ? (long)EMPTY :
((long)phase << PHASE_SHIFT) |
((long)parties << PARTIES_SHIFT) |
((long)parties);
}
}
当parties=0时,state被赋予一个EMPTY常量,常量为1;
当parties!=0时,把phase值左移32位;把parties左移16位;然后parties也作为最低的16位,3个值做或操作,赋值给state。这个赋值操作也反映了上图的意思。
阻塞与唤醒(Treiber Stack)
基于上述的state变量,对其执行CAS操作,并进行相应的阻塞与唤醒。如图所示,右边的主线程会调用awaitAdvance()进行阻塞;左边的arrive()会对state进行CAS的累减操作,当未到达的线程数减到0时,唤醒右边阻塞的主线程。
在这里,阻塞使用的是一个称为Treiber Stack的数据结构,而不是AQS的双向链表。Treiber Stack是一个无锁的栈,由R.Kent Treiber在其于1986年发表的论文SystemsProgramming:Coping with Para llelism 中首次提出。它是一个单向链表,出栈、入栈都在链表头部,所以只需要一个head指针,而不需要tail指针。实现代码如下所示。
public class Phaser {
/**
* Wait nodes for Treiber stack representing wait queue
*/
static final class QNode implements ForkJoinPool.ManagedBlocker {
final Phaser phaser;
final int phase;
final boolean interruptible;
final boolean timed;
boolean wasInterrupted;
long nanos;
final long deadline;
volatile Thread thread; // nulled to cancel wait
QNode next;
}
/**
* Heads of Treiber stacks for waiting threads. To eliminate
* contention when releasing some threads while adding others, we
* use two of them, alternating across even and odd phases.
* Subphasers share queues with root to speed up releases.
*/
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;
}
为了减少并发冲突,这里定义了2个链表,也就是2个TreiberStack。当phase为奇数轮的时候,阻塞线程放在oddQ里面;当phase为偶数轮的时候,阻塞线程放在evenQ里面。代码如下所示。
public class Phaser {
private AtomicReference<QNode> queueFor(int phase) {
return ((phase & 1) == 0) ? evenQ : oddQ;
}
}
arrive()函数分析
下面看arrive()函数是如何对state变量进行操作,又是如何唤醒线程的。
public class Phaser {
public int arrive() {
return doArrive(ONE_ARRIVAL);
}
public int arriveAndDeregister() {
return doArrive(ONE_DEREGISTER);
}
}
arrive()和arriveAndDeregister()内部调用的都是doArrive(boolean)函数。区别在于前者只是把“未达到线程数”减1;后者则把“未到达线程数”和“下一轮的总线程数”都减1。下面看一下doArrive(boolean)函数的实现。
public class Phaser {
/**
* Main implementation for methods arrive and arriveAndDeregister.
* Manually tuned to speed up and minimize race windows for the
* common case of just decrementing unarrived field.
*
* @param adjust value to subtract from state;
* ONE_ARRIVAL for arrive,
* ONE_DEREGISTER for arriveAndDeregister
*/
private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
if (unarrived == 1) {
long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (root == this) {
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
releaseWaiters(phase);
}
else if (nextUnarrived == 0) { // propagate deregistration
phase = parent.doArrive(ONE_DEREGISTER);
UNSAFE.compareAndSwapLong(this, stateOffset,
s, s | EMPTY);
}
else
phase = parent.doArrive(ONE_ARRIVAL);
}
return phase;
}
}
}
}
关于上面的函数,有以下几点说明:
-
定义了2个常量如下。当deregister=false 时,只最低的16位需要减1,adj=ONE_ARRIVAL;当deregister=true时,低32位中的2个16位都需要减1,adj=ONE_ARRIVAL|ONE_PARTY。
public class Phaser { private static final int PARTIES_SHIFT = 16; private static final int ONE_ARRIVAL = 1; private static final int ONE_PARTY = 1 << PARTIES_SHIFT; }
-
把未到达线程数减1。减了之后,如果还未到0,什么都不做,直接返回。如果到0,会做2件事情:第1,重置state,把state的未到达线程个数重置到总的注册的线程数中,同时phase加1;第2,唤醒队列中的线程。
下面看一下唤醒函数:
public class Phaser {
/**
* Removes and signals threads from queue for phase.
*/
private void releaseWaiters(int phase) {
QNode q; // first element of queue
Thread t; // its thread
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
while ((q = head.get()) != null &&
q.phase != (int)(root.state >>> PHASE_SHIFT)) {
if (head.compareAndSet(q, q.next) &&
(t = q.thread) != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
}
}
遍历整个栈,只要栈当中节点的phase不等于当前Phaser的phase,说明该节点不是当前轮的,而是前一轮的,应该被释放并唤醒。
接下来看一下线程是如何被阻塞的。
awaitAdvance()函数分析
public class Phaser {
/**
* Awaits the phase of this phaser to advance from the given phase
* value, returning immediately if the current phase is not equal
* to the given phase value or this phaser is terminated.
*
* @param phase an arrival phase number, or negative value if
* terminated; this argument is normally the value returned by a
* previous call to {@code arrive} or {@code arriveAndDeregister}.
* @return the next arrival phase number, or the argument if it is
* negative, or the (negative) {@linkplain #getPhase() current phase}
* if terminated
*/
public int awaitAdvance(int phase) {
final Phaser root = this.root;
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase)
return root.internalAwaitAdvance(phase, null);
return p;
}
}
下面的while循环中有4个分支:初始的时候,node==null,进入第1个分支进行自旋,自旋次数满足之后,会新建一个QNode节点;之后执行第3、第4个分支,分别把该节点入栈并阻塞。
public class Phaser {
/**
* Possibly blocks and waits for phase to advance unless aborted.
* Call only on root phaser.
*
* @param phase current phase
* @param node if non-null, the wait node to track interrupt and timeout;
* if null, denotes noninterruptible wait
* @return current phase
*/
private int internalAwaitAdvance(int phase, QNode node) {
// assert root == this;
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
if (node == null) { // spinning in noninterruptible mode
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
if (interrupted || --spins < 0) { // need node to record intr
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
}
else if (node.isReleasable()) // done or aborted
break;
else if (!queued) { // push onto queue
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
queued = head.compareAndSet(q, node);
}
else {
try {
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
node.wasInterrupted = true;
}
}
}
if (node != null) {
if (node.thread != null)
node.thread = null; // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase); // possibly clean up on abort
}
releaseWaiters(phase);
return p;
}
}
这里调用了ForkJoinPool.managedBlock(ManagedBlockerblocker)函数,目的是把node对应的线程阻塞。ManagerdBlocker是ForkJoinPool里面的一个接口,定义如下:
public static interface ManagedBlocker {
boolean block() throws InterruptedException;
boolean isReleasable();
}
QNode实现了该接口,实现原理还是park(),如下所示。之所以没有直接使用park()/unpark()来实现阻塞、唤醒,而是封装了ManagedBlocker这一层,主要是出于使用上的方便考虑。一方面是park()可能被中断唤醒,另一方面是带超时时间的park(),把这二者都封装在一起。
public class Phaser {
static final class QNode implements ForkJoinPool.ManagedBlocker {
public boolean isReleasable() {
if (thread == null)
return true;
if (phaser.getPhase() != phase) {
thread = null;
return true;
}
if (Thread.interrupted())
wasInterrupted = true;
if (wasInterrupted && interruptible) {
thread = null;
return true;
}
if (timed) {
if (nanos > 0L) {
nanos = deadline - System.nanoTime();
}
if (nanos <= 0L) {
thread = null;
return true;
}
}
return false;
}
public boolean block() {
if (isReleasable())
return true;
else if (!timed)
LockSupport.park(this);
else if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
return isReleasable();
}
}
}
理解了arrive()和awaitAdvance(),arriveAndAwaitAdvance()就是二者的一个组合版本,此处不再展开分析。