第五章 并发容器
BlockingQueue
在所有的并发容器中,BlockingQueue是最常见的一种。BlockingQueue是一个带阻塞功能的队列,当入队列时,若队列已满,则阻塞调用者;当出队列时,若队列为空,则阻塞调用者。
在Concurrent包中,BlockingQueue是一个接口,有许多个不同的实现类,如图所示。
该接口的定义如下:
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
E take() throws InterruptedException;
...
}
可以看到,该接口和JDK集合包中的Queue接口是兼容的,同时在其基础上增加了阻塞功能。在这里,入队提供了add(..)、offer(..)、put (..)3个函数,有什么区别呢?从上面的定义可以看到,add(..)和offer(..)的返回值是布尔类型,而put无返回值,还会抛出中断异常,所以add(..)和offer(..)是无阻塞的,也是Queue本身定义的接口,而put(..)是阻塞式的。add(..)和offer(..)的区别不大,当队列为满的时候,前者会抛出异常,后者则直接返回false。
出队列与之类似,提供了remove()、peek()、take()等函数,remove()和peek()是非阻塞式的,take()是阻塞式的。
ArrayBlockingQueue
ArrayBlockingQueue 是一个用数组实现的环形队列,在构造函数中,会要求传入数组的容量。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
}
其核心数据结构如下:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
}
其put/take函数也很简单,如下所示。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
}
LinkedBlockingQueue
LinkedBlockingQueue是一种基于单向链表的阻塞队列。因为队头和队尾是2个指针分开操作的,所以用了2把锁+2个条件,同时有1个AtomicInteger的原子变量记录count数。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
}
在其构造函数中,也可以指定队列的总容量。如果不指定,默认为Integer.MAX_VALUE。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
}
下面看一下其put/take实现。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
}
LinkedBlockingQueue和ArrayBlockingQueue的实现有一些差异,有几点要特别说明:
-
为了提高并发度,用2把锁,分别控制队头、队尾的操作。意味着在put(..)和put(..)之间、take()与take()之间是互斥的,put(..)和take()之间并不互斥。但对于count变量,双方都需要操作,所以必须是原子类型。
-
因为各自拿了一把锁,所以当需要调用对方的condition的signal时,还必须再加上对方的锁,就是signalNotEmpty()和signalNotFull()函数。示例如下所示。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } /** * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } }
-
不仅put会通知take,take 也会通知put。当put 发现非满的时候,也会通知其他put线程;当take发现非空的时候,也会通知其他take线程。
PriorityBlockingQueue
队列通常是先进先出的,而PriorityQueue是按照元素的优先级从小到大出队列的。正因为如此,PriorityQueue中的2个元素之间需要可以比较大小,并实现Comparable接口。
其核心数据结构如下:
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* Priority queue represented as a balanced binary heap: the two
* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
* priority queue is ordered by comparator, or by the elements'
* natural ordering, if comparator is null: For each node n in the
* heap and each descendant d of n, n <= d. The element with the
* lowest value is in queue[0], assuming the queue is nonempty.
*/
private transient Object[] queue;
/**
* The number of elements in the priority queue.
*/
private transient int size;
/**
* The comparator, or null if priority queue uses elements'
* natural ordering.
*/
private transient Comparator<? super E> comparator;
/**
* Lock used for all public operations
*/
private final ReentrantLock lock;
/**
* Condition for blocking when empty
*/
private final Condition notEmpty;
}
其构造函数如下所示,如果不指定初始大小,内部会设定一个默认值11,当元素个数超过这个大小之后,会自动扩容。
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final int DEFAULT_INITIAL_CAPACITY = 11;
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
}
下面是对应的put/take函数的实现。
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
}
从上面可以看到,在阻塞的实现方面,和ArrayBlockingQueue的机制相似,主要区别是用数组实现了一个二叉堆,从而实现按优先级从小到大出队列。另一个区别是没有notFull条件,当元素个数超出数组长度时,执行扩容操作。
DelayQueue
DelayQueue即延迟队列,也就是一个按延迟时间从小到大出队的PriorityQueue。所谓延迟时间,就是“未来将要执行的时间”-“当前时间”。为此,放入DelayQueue中的元素,必须实现Delayed接口,如下所示。
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
关于该接口,有两点说明:
- 如果getDelay的返回值小于或等于0,则说明该元素到期,需要从队列中拿出来执行。
- 该接口首先继承了Comparable 接口,所以要实现该接口,必须实现Comparable 接口。具体来说,就是基于getDelay()的返回值比较两个元素的大小。
下面看一下DelayQueue的核心数据结构。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final PriorityQueue<E> q = new PriorityQueue<E>();
private final transient ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();
}
下面介绍put/take的实现,先从take说起,因为这样更能看出DelayQueue的特性。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
}
关于take()函数,有两点需要说明:
- 不同于一般的阻塞队列,只在队列为空的时候,才阻塞。如果堆顶元素的延迟时间没到,也会阻塞。
- 在上面的代码中使用了一个优化技术,用一个Threadleader变量记录了等待堆顶元素的第1个线程。为什么这样做呢?通过getDelay(..)可以知道堆顶元素何时到期,不必无限期等待,可以使用condition.awaitNanos()等待一个有限的时间;只有当发现还有其他线程也在等待堆顶元素(leader!=NULL)时,才需要无限期等待。
下面看一下put的实现。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
}
关于上面的实现,有一点要说明:不是每放入一个元素,都需要通知等待的线程。放入的元素,如果其延迟时间大于当前堆顶的元素延迟时间,就没必要通知等待的线程;只有当延迟时间是最小的,在堆顶时,才有必要通知等待的线程,也就是上面代码中的if(q.peek()==e)段落。
SynchronousQueue
SynchronousQueue是一种特殊的BlockingQueue,它本身没有容量。先调put(..),线程会阻塞;直到另外一个线程调用了take(),两个线程才同时解锁,反之亦然。对于多个线程而言,例如3个线程,调用3次put(..),3个线程都会阻塞;直到另外的线程调用3次take(),6个线程才同时解锁,反之亦然。
在讲解线程池中的CachedThreadPool实现的时候会使用到SynchronousQueue的这种特性。
接下来就看一下,SynchronousQueue是如何实现的。先从构造函数看起。
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
}
和锁一样,也有公平和非公平模式。如果是公平模式,则用TransferQueue实现;如果是非公平模式,则用TransferStack实现。这两个类分别是什么呢?先看一下put/take的实现。
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
}
可以看到,put/take都调用了transfer(..)接口。而TransferQueue和TransferStack分别实现了这个接口。该接口在SynchronousQueue内部,如下所示。如果是put(..),则第1个参数就是对应的元素;如果是take(),则第1个参数为null。后2个参数分别为是否设置超时和对应的超时时间。
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
abstract static class Transferer<E> {
/**
* Performs a put or take.
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
* offered by producer.
* @param timed if this operation should timeout
* @param nanos the timeout, in nanoseconds
* @return if non-null, the item provided or received; if null,
* the operation failed due to timeout or interrupt --
* the caller can distinguish which of these occurred
* by checking Thread.interrupted.
*/
abstract E transfer(E e, boolean timed, long nanos);
}
}
接下来看一下什么是公平模式和非公平模式,如图所示。假设3个线程分别调用了put(..),3个线程会进入阻塞状态,直到其他线程调用3次take(),和3个put(..)一一配对。
如果是公平模式(队列模式),则第1个调用put(..)的线程1会在队列头部,第1个到来的take()线程和它进行配对,遵循先到先配对的原则,所以是公平的;如果是非公平模式(栈模式),则第3个调用put(..)的线程3会在栈顶,第1个到来的take()线程和它进行配对,遵循的是后到先配对的原则,所以是非公平的。
下面分别看一下TransferQueue和TransferStack的实现
TransferQueue
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
static final class TransferQueue<E> extends Transferer<E> {
static final class QNode {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
}
/** Head of queue */
transient volatile QNode head;
/** Tail of queue */
transient volatile QNode tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it was cancelled.
*/
transient volatile QNode cleanMe;
}
}
从上面的代码可以看出,TransferQueue是一个基于单向链表而实现的队列,通过head和tail 2个指针记录头部和尾部。初始的时候,head和tail会指向一个空节点,构造函数如下所示。
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
static final class TransferQueue<E> extends Transferer<E> {
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
}
}
下图所示为TransferQueue的工作原理。
- 阶段(a):队列中是一个空的节点,head/tail都指向这个空节点。
- 阶段(b):3个线程分别调用put,生成3个QNode,进入队列。
- 阶段(c):来了一个线程调用take,会和队列头部的第1个QNode进行配对。
- 阶段(d):第1个QNode出队列。
这里有一个关键点:put节点和take节点一旦相遇,就会配对出队列,所以在队列中不可能同时存在put节点和take节点,要么所有节点都是put节点,要么所有节点都是take节点。
接下来看一下TransferQueue的代码实现。
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
static final class TransferQueue<E> extends Transferer<E> {
/**
* Puts or takes an item.
*/
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
// 队列还未初始化;自旋等待
if (t == null || h == null) // saw uninitialized value
continue; // spin
// 队列为空或者当前线程和队列中元素为同一种模式
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
// 不一致读,重新执行for循环
if (t != tail) // inconsistent read
continue;
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
// 新建一个节点
s = new QNode(e, isData);
// 新建的节点加入尾部
if (!t.casNext(null, s)) // failed to link in
continue;
// 后裔tail指针
advanceTail(t, s); // swing tail and wait
// 进入阻塞状态
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
// 从阻塞中被唤醒,确定已经处于队列中的第一个元素
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // complementary-mode
// 当前线程可以和队列中的第一个元素进行配对
// 取队列中的第一个元素
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
// 不一致读,重新执行for循环
continue; // inconsistent read
Object x = m.item;
// 已经配对过
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // 尝试配对
// 已经配对过,出队列
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
// 唤醒队列中第一个元素对应的线程
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
}
}
整个for 循环有两个大的if-else 分支,如果当前线程和队列中的元素是同一种模式(都是put节点或者take节点),则与当前线程对应的节点被加入队列尾部并且阻塞;如果不是同一种模式,则选取队列头部的第1个元素进行配对。
这里的配对就是m.casItem(x,e),把自己的item x换成对方的item e,如果CAS操作成功,则配对成功。如果是put节点,则isData=true,item!=null;如果是take节点,则isData=false,item=null。如果CAS操作不成功,则isData和item之间将不一致,也就是isData!=(x!=null),通过这个条件可以判断节点是否已经被匹配过了。
TransferStack
TransferStack的定义如下所示,首先,它也是一个单向链表。不同于队列,只需要head指针就能实现入栈和出栈操作。
链表中的节点有三种状态,REQUEST对应take节点,DATA对应put节点,二者配对之后,会生成一个FULFILLING节点,入栈,然后FULLING节点和被配对的节点一起出栈。
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
static final class TransferStack<E> extends Transferer<E> {
/** Node represents an unfulfilled consumer */
static final int REQUEST = 0;
/** Node represents an unfulfilled producer */
static final int DATA = 1;
/** Node is fulfilling another unfulfilled DATA or REQUEST */
static final int FULFILLING = 2;
static final class SNode {
volatile SNode next; // next node in stack
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
}
/** The head (top) of the stack */
volatile SNode head;
}
}
如图所示为TransferStack的工作原理。
- 阶段(a):head指向NULL。不同于TransferQueue,这里没有空的头节点。
- 阶段(b):3个线程调用3次put,依次入栈。
- 阶段(c):线程4调用take,和栈顶的第1个元素配对,生成FULLFILLING节点,入栈。
- 阶段(d):栈顶的2个元素同时入栈。
下面看一下具体的代码实现。
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
static final class TransferStack<E> extends Transferer<E> {
/**
* Puts or takes an item.
*/
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
/*
* Basic algorithm is to loop trying one of three actions:
*
* 1. If apparently empty or already containing nodes of same
* mode, try to push node on stack and wait for a match,
* returning it, or null if cancelled.
*
* 2. If apparently containing node of complementary mode,
* try to push a fulfilling node on to stack, match
* with corresponding waiting node, pop both from
* stack, and return matched item. The matching or
* unlinking might not actually be necessary because of
* other threads performing action 3:
*
* 3. If top of stack already holds another fulfilling node,
* help it out by doing its match and/or pop
* operations, and then continue. The code for helping
* is essentially the same as for fulfilling, except
* that it doesn't return the item.
*/
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
// 同一种模式
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) { // 入栈
SNode m = awaitFulfill(s, timed, nanos); // 阻塞等待
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // 非同一种模式,待匹配
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// 生成一个FULFILLING节点,入栈
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
// 两个节点一起出栈
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
} else { // 已经匹配过了,出栈
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // 配对,一起出栈
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
}
}
BlockingDeque
BlockingDeque定义了一个阻塞的双端队列接口,如下所示。
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
void putFirst(E e) throws InterruptedException;
void putLast(E e) throws InterruptedException;
E takeFirst() throws InterruptedException;
E takeLast() throws InterruptedException;
...
}
可以看到,该接口在继承了BlockingQueue接口的同时,增加了对应的双端队列操作接口。该接口只有一个实现,就是LinkedBlockingDeque。
其核心数据结构如下所示,是一个双向链表。
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
/** Doubly-linked list node class */
static final class Node<E> {
/**
* The item, or null if this node has been removed.
*/
E item;
/**
* One of:
* - the real predecessor Node
* - this Node, meaning the predecessor is tail
* - null, meaning there is no predecessor
*/
Node<E> prev;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head
* - null, meaning there is no successor
*/
Node<E> next;
Node(E x) {
item = x;
}
}
/**
* Pointer to first node.
* Invariant: (first == null && last == null) ||
* (first.prev == null && first.item != null)
*/
transient Node<E> first;
/**
* Pointer to last node.
* Invariant: (first == null && last == null) ||
* (last.next == null && last.item != null)
*/
transient Node<E> last;
/** Number of items in the deque */
private transient int count;
/** Maximum number of items in the deque */
private final int capacity;
/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();
/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();
/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
}
对应的实现原理,和LinkedBlockingQueue基本一样,只是LinkedBlockingQueue是单向链表,而LinkedBlockingDeque是双向链表。
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkLast()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}
}
CopyOnWrite
CopyOnWrite指在“写”的时候,不是直接“写”源数据,而是把数据拷贝一份进行修改,再通过悲观锁或者乐观锁的方式写回。那为什么不直接修改,而是要拷贝一份修改呢?这是为了在“读”的时候不加锁。下面通过几个案例来展现CopyOnWrite的应用。
CopyOnWriteArrayList
和ArrayList一样,CopyOnWriteArrayList的核心数据结构也是一个数组,代码如下。
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
}
下面是CopyOnArrayList的几个“读”函数:
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
public E get(int index) {
return get(getArray(), index);
}
public boolean isEmpty() {
return size() == 0;
}
public boolean contains(Object o) {
Object[] elements = getArray();
return indexOf(o, elements, 0, elements.length) >= 0;
}
public int indexOf(Object o) {
Object[] elements = getArray();
return indexOf(o, elements, 0, elements.length);
}
public int indexOf(E e, int index) {
Object[] elements = getArray();
return indexOf(e, elements, index, elements.length);
}
}
这些“读”函数都没有加锁,那么是如何保证“线程安全”呢?答案在“写”函数里面。
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
public void add(int index, E element) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
if (index > len || index < 0)
throw new IndexOutOfBoundsException("Index: "+index+
", Size: "+len);
Object[] newElements;
int numMoved = len - index;
if (numMoved == 0)
newElements = Arrays.copyOf(elements, len + 1);
else {
newElements = new Object[len + 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index, newElements, index + 1,
numMoved);
}
newElements[index] = element;
setArray(newElements);
} finally {
lock.unlock();
}
}
}
其他“写”函数,例如remove和add类似。
CopyOnWriteArraySet
CopyOnWriteArraySet 就是用Array 实现的一个Set,保证所有元素都不重复。其内部是封装的一个CopyOnWriteArrayList。
public class CopyOnWriteArraySet<E> extends AbstractSet<E>
implements java.io.Serializable {
private final CopyOnWriteArrayList<E> al;
public CopyOnWriteArraySet() {
al = new CopyOnWriteArrayList<E>();
}
public boolean add(E e) {
return al.addIfAbsent(e);
}
}
ConcurrentLinkedQueue/Deque
前面详细分析了AQS内部的阻塞队列实现原理:基于双向链表,通过对head/tail进行CAS操作,实现入队和出队。
ConcurrentLinkedQueue 的实现原理和AQS 内部的阻塞队列类似:同样是基于CAS,同样是通过head/tail指针记录队列头部和尾部,但还是有稍许差别。
首先,它是一个单向链表,定义如下。
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
private static class Node<E> {
volatile E item;
volatile Node<E> next;
}
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
}
其次,在AQS的阻塞队列中,每次入队后,tail一定后移一个位置;每次出队,head一定前移一个位置,以保证head指向队列头部,tail指向链表尾部。
但在ConcurrentLinkedQueue中,head/tail的更新可能落后于节点的入队和出队,因为它不是直接对head/tail指针进行CAS操作的,而是对Node中的item进行操作。下面进行详细分析:
-
初始化
初始的时候,如图所示,head和tail都执行一个NULL节点。对应的代码如下。
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable { public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); } }
-
入队列
代码如下所示。
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable { public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { // p is last node // 关键点:是对tail的next指针而不是对tail指针执行CAS操作 if (p.casNext(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. // 已经到达队列尾部 p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. // 后移p指针 p = (p != t && t != (t = tail)) ? t : q; } } }
上面的入队其实是每次在队尾追加2个节点时,才移动一次tail节点,具体过程如图所示。
初始的时候,队列中有1个节点item1,tail指向该节点,假设线程1要入队item2节点:
-
step1:p=tail,q=p.next=NULL;
-
step2:对p的next执行CAS操作,追加item2,成功之后,p=tail。所以上面的casTail函数不会执行,直接返回。此时tail指针没有变化。
之后,假设线程2要入队item3节点,如图5-7所示。
-
step3:p=tail,q=p.next.
-
step4:q!=NULL,因此不会入队新节点。p,q都后移1位。
-
step5:q=NULL,对p的next执行CAS操作,入队item3节点。
-
step6:p!=t,满足条件,执行上面的casTail操作,tail后移2个位置,到达队列尾部。
最后总结一下入队列的两个关键点:
- 即使tail指针没有移动,只要对p的next指针成功进行CAS操作,就算成功入队列。
- 只有当p!=tail的时候,才会后移tail指针。也就是说,每连续追加2个节点,才后移1次tail指针。即使CAS失败也没关系,可以由下1个线程来移动tail指针。
-
-
出队列
上面说了入队列之后,tail指针不变化,那是否会出现入队列之后,要出队列却没有元素可出的情况呢?
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable { public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; // 关键点:在出队列的时候,并没有移动head指针,而是把item设置为NULL if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. // 每产生2个NULL节点,才把head指针后移2位 if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } } }
出队列的代码和入队列类似,也有p、q2个指针,整个变化过程如图5-8所示。假设初始的时候head指向空节点,队列中有item1、item2、item3三个节点。
- step1:p=head,q=p.next.p!=q.
- step2:后移p指针,使得p=q。
- step3:出队列。关键点:此处并没有直接删除item1节点,只是把该节点的item通过CAS操作置为了NULL。
- step4:p!=head,此时队列中有了2个NULL 节点,再前移1次head指针,对其执行updateHead操作。
最后总结一下出队列的关键点:
- 出队列的判断并非观察tail 指针的位置,而是依赖于head指针后续的节点是否为NULL这一条件。
- 只要对节点的item 执行CAS操作,置为NULL成功,则出队列成功。即使head指针没有成功移动,也可以由下1个线程继续完成。
-
队列判空
因为head/tail 并不是精确地指向队列头部和尾部,所以不能简单地通过比较head/tail 指针来判断队列是否为空,而是需要从head指针开始遍历,找第1个不为NULL的节点。如果找到,则队列不为空;如果找不到,则队列为空。代码如下所示。
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable { public boolean isEmpty() { return first() == null; } Node<E> first() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null; } else if (p == q) continue restartFromHead; else p = q; } } } }
ConcurrentHashMap
HashMap通常的实现方式是“数组+链表”,这种方式被称为“拉链法”。ConcurrentHashMap在这个基本原理之上进行了各种优化,在JDK 7和JDK 8中的实现方式有很大差异,下面分开讨论。
JDK 7中的实现方式(TODO)
为了提高并发度,在JDK7中,一个HashMap被拆分为多个子HashMap。每一个子HashMap称作一个Segment,多个线程操作多个Segment相互独立,如图所示。
TODO
JDK 8中的实现方式(TODO)
TODO
ConcurrentSkipListMap/Set
ConcurrentHashMap 是一种key 无序的HashMap,HashMap 则是key有序的,实现了NavigableMap接口,此接口又继承了SortedMap接口。
ConcurrentSkipListMap
为什么要使用SkipList实现Map?
在Java的util包中,有一个非线程安全的HashMap,也就是TreeMap,是key有序的,基于红黑树实现。
而在Concurrent包中,提供的key有序的HashMap,也就是ConcurrentSkipListMap,是基于SkipList(跳查表)来实现的。这里为什么不用红黑树,而用跳查表来实现呢?
借用Doug Lea的原话:也就是目前计算机领域还未找到一种高效的、作用在树上的、无锁的、增加和删除节点的办法。
那为什么SkipList可以无锁地实现节点的增加、删除呢?这要从无锁链表的实现说起。
无锁链表
Doug Lea在注释中引用了一篇无锁链表的论文:A pragmaticimplementation of non-blocking linked lists。
表面上看,无锁链表是很简单的,根本不需要写一篇论文来专门论述。在上文中讲解AQS时,曾反复用到无锁队列,其实现也是链表。究竟二者的区别在哪呢?
上文所讲的无锁队列、栈,都是只在队头、队尾进行CAS操作,通常不会有问题。如果在链表的中间进行插入或删除操作,按照通常的CAS做法,就会出现问题!
关于这个问题,Doug Lea的论文中有清晰的论述,此处引用如下:
操作1:在节点10后面插入节点20。如图5-14所示,首先把节点20的next指针指向节点30,然后对节点10的next指针执行CAS操作,使其指向节点20即可。
操作2:删除节点10。如图5-15所示,只需把头节点的next指针,进行CAS操作到节点30即可。
但是,如果两个线程同时操作,一个删除节点10,一个要在节点10后面插入节点20。并且这两个操作都各自是CAS的,此时就会出现问题。如图所示,删除节点10,会同时把新插入的节点20也删除掉!这个问题超出了CAS的解决范围。
为什么会出现这个问题呢?
究其原因:在删除节点10的时候,实际受到操作的是节点10的前驱,也就是头节点。节点10本身没有任何变化。故而,再往节点10后插入节点20的线程,并不知道节点10已经被删除了!
针对这个问题,在论文中提出了如下的解决办法,如图5-17所示,把节点10的删除分为两2步:
- 第一步,把节点10的next指针,mark成删除,即软删除;
- 第二步,找机会,物理删除。
做标记之后,当线程再往节点10后面插入节点20的时候,便可以先进行判断,节点10是否已经被删除,从而避免在一个删除的节点10后面插入节点20。这个解决方法有一个关键点:“把节点10的next指针指向节点20(插入操作)”和“判断节点10本身是否已经删除(判断操作)”,必须是原子的,必须在1个CAS操作里面完成!
具体的实现有两个办法:
- 办法一:AtomicMarkableReference:保证每个next 是AtomicMarkableReference 类型。但这个办法不够高效,Doug Lea 在ConcurrentSkipListMap的实现中用了另一种办法。
- 办法2:Mark节点:我们的目的是标记节点10已经删除,也就是标记它的next字段。那么可以新造一个marker节点,使节点10的next指针指向该Marker节点。这样,当向节点10的后面插入节点20的时候,就可以在插入的同时判断节点10的next指针是否执行了一个Marker节点,这两个操作可以在一个CAS操作里面完成。
跳查表(TODO)
TODO
解决了无锁链表的插入或删除问题,也就解决了跳查表的一个关键问题。因为跳查表就是多层链表叠起来的。下面先看一下跳查表的数据结构(下面所用代码都引用自JDK7,JDK 8中的代码略有差异,但不影响下面的原理分析)。
ConcurrentSkipListSet
ConcurrentSkipListSet只是对ConcurrentSkipListMap的简单封装。