第五章 并发容器

BlockingQueue

在所有的并发容器中,BlockingQueue是最常见的一种。BlockingQueue是一个带阻塞功能的队列,当入队列时,若队列已满,则阻塞调用者;当出队列时,若队列为空,则阻塞调用者。

在Concurrent包中,BlockingQueue是一个接口,有许多个不同的实现类,如图所示。

BlockingQueue的各种实现类

该接口的定义如下:

public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);
    boolean offer(E e);
    void put(E e) throws InterruptedException;
    E take() throws InterruptedException;
    ...
}

可以看到,该接口和JDK集合包中的Queue接口是兼容的,同时在其基础上增加了阻塞功能。在这里,入队提供了add(..)、offer(..)、put (..)3个函数,有什么区别呢?从上面的定义可以看到,add(..)和offer(..)的返回值是布尔类型,而put无返回值,还会抛出中断异常,所以add(..)和offer(..)是无阻塞的,也是Queue本身定义的接口,而put(..)是阻塞式的。add(..)和offer(..)的区别不大,当队列为满的时候,前者会抛出异常,后者则直接返回false。

出队列与之类似,提供了remove()、peek()、take()等函数,remove()和peek()是非阻塞式的,take()是阻塞式的。

ArrayBlockingQueue

ArrayBlockingQueue 是一个用数组实现的环形队列,在构造函数中,会要求传入数组的容量。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
}

其核心数据结构如下:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
}

其put/take函数也很简单,如下所示。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
}

LinkedBlockingQueue

LinkedBlockingQueue是一种基于单向链表的阻塞队列。因为队头和队尾是2个指针分开操作的,所以用了2把锁+2个条件,同时有1个AtomicInteger的原子变量记录count数。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();    
}

在其构造函数中,也可以指定队列的总容量。如果不指定,默认为Integer.MAX_VALUE。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }    
}

下面看一下其put/take实现。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
    
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
}

LinkedBlockingQueue和ArrayBlockingQueue的实现有一些差异,有几点要特别说明:

  1. 为了提高并发度,用2把锁,分别控制队头、队尾的操作。意味着在put(..)和put(..)之间、take()与take()之间是互斥的,put(..)和take()之间并不互斥。但对于count变量,双方都需要操作,所以必须是原子类型。

  2. 因为各自拿了一把锁,所以当需要调用对方的condition的signal时,还必须再加上对方的锁,就是signalNotEmpty()和signalNotFull()函数。示例如下所示。

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
        /**
         * Signals a waiting take. Called only from put/offer (which do not
         * otherwise ordinarily lock takeLock.)
         */
        private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }
    
        /**
         * Signals a waiting put. Called only from take/poll.
         */
        private void signalNotFull() {
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                notFull.signal();
            } finally {
                putLock.unlock();
            }
        }
    }
    
  3. 不仅put会通知take,take 也会通知put。当put 发现非满的时候,也会通知其他put线程;当take发现非空的时候,也会通知其他take线程。

PriorityBlockingQueue

队列通常是先进先出的,而PriorityQueue是按照元素的优先级从小到大出队列的。正因为如此,PriorityQueue中的2个元素之间需要可以比较大小,并实现Comparable接口。

其核心数据结构如下:

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    /**
     * Priority queue represented as a balanced binary heap: the two
     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
     * priority queue is ordered by comparator, or by the elements'
     * natural ordering, if comparator is null: For each node n in the
     * heap and each descendant d of n, n <= d.  The element with the
     * lowest value is in queue[0], assuming the queue is nonempty.
     */
    private transient Object[] queue;

    /**
     * The number of elements in the priority queue.
     */
    private transient int size;

    /**
     * The comparator, or null if priority queue uses elements'
     * natural ordering.
     */
    private transient Comparator<? super E> comparator;

    /**
     * Lock used for all public operations
     */
    private final ReentrantLock lock;

    /**
     * Condition for blocking when empty
     */
    private final Condition notEmpty;
}

其构造函数如下所示,如果不指定初始大小,内部会设定一个默认值11,当元素个数超过这个大小之后,会自动扩容。

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    
    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
    
    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }
}

下面是对应的put/take函数的实现。

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    public void put(E e) {
        offer(e); // never need to block
    }
    
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }
    
    private E dequeue() {
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            E result = (E) array[0];
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }
}

从上面可以看到,在阻塞的实现方面,和ArrayBlockingQueue的机制相似,主要区别是用数组实现了一个二叉堆,从而实现按优先级从小到大出队列。另一个区别是没有notFull条件,当元素个数超出数组长度时,执行扩容操作。

DelayQueue

DelayQueue即延迟队列,也就是一个按延迟时间从小到大出队的PriorityQueue。所谓延迟时间,就是“未来将要执行的时间”-“当前时间”。为此,放入DelayQueue中的元素,必须实现Delayed接口,如下所示。

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

关于该接口,有两点说明:

  1. 如果getDelay的返回值小于或等于0,则说明该元素到期,需要从队列中拿出来执行。
  2. 该接口首先继承了Comparable 接口,所以要实现该接口,必须实现Comparable 接口。具体来说,就是基于getDelay()的返回值比较两个元素的大小。

下面看一下DelayQueue的核心数据结构。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    private final transient ReentrantLock lock = new ReentrantLock();
    private final Condition available = lock.newCondition();
}

下面介绍put/take的实现,先从take说起,因为这样更能看出DelayQueue的特性。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
}

关于take()函数,有两点需要说明:

  1. 不同于一般的阻塞队列,只在队列为空的时候,才阻塞。如果堆顶元素的延迟时间没到,也会阻塞。
  2. 在上面的代码中使用了一个优化技术,用一个Threadleader变量记录了等待堆顶元素的第1个线程。为什么这样做呢?通过getDelay(..)可以知道堆顶元素何时到期,不必无限期等待,可以使用condition.awaitNanos()等待一个有限的时间;只有当发现还有其他线程也在等待堆顶元素(leader!=NULL)时,才需要无限期等待。

下面看一下put的实现。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    public void put(E e) {
        offer(e);
    }
    
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
}

关于上面的实现,有一点要说明:不是每放入一个元素,都需要通知等待的线程。放入的元素,如果其延迟时间大于当前堆顶的元素延迟时间,就没必要通知等待的线程;只有当延迟时间是最小的,在堆顶时,才有必要通知等待的线程,也就是上面代码中的if(q.peek()==e)段落。

SynchronousQueue

SynchronousQueue是一种特殊的BlockingQueue,它本身没有容量。先调put(..),线程会阻塞;直到另外一个线程调用了take(),两个线程才同时解锁,反之亦然。对于多个线程而言,例如3个线程,调用3次put(..),3个线程都会阻塞;直到另外的线程调用3次take(),6个线程才同时解锁,反之亦然。

在讲解线程池中的CachedThreadPool实现的时候会使用到SynchronousQueue的这种特性。

接下来就看一下,SynchronousQueue是如何实现的。先从构造函数看起。

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    public SynchronousQueue() {
        this(false);
    }
    
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
}

和锁一样,也有公平和非公平模式。如果是公平模式,则用TransferQueue实现;如果是非公平模式,则用TransferStack实现。这两个类分别是什么呢?先看一下put/take的实现。

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }
}

可以看到,put/take都调用了transfer(..)接口。而TransferQueue和TransferStack分别实现了这个接口。该接口在SynchronousQueue内部,如下所示。如果是put(..),则第1个参数就是对应的元素;如果是take(),则第1个参数为null。后2个参数分别为是否设置超时和对应的超时时间。

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    abstract static class Transferer<E> {
        /**
         * Performs a put or take.
         *
         * @param e if non-null, the item to be handed to a consumer;
         *          if null, requests that transfer return an item
         *          offered by producer.
         * @param timed if this operation should timeout
         * @param nanos the timeout, in nanoseconds
         * @return if non-null, the item provided or received; if null,
         *         the operation failed due to timeout or interrupt --
         *         the caller can distinguish which of these occurred
         *         by checking Thread.interrupted.
         */
        abstract E transfer(E e, boolean timed, long nanos);
    }
}

接下来看一下什么是公平模式和非公平模式,如图所示。假设3个线程分别调用了put(..),3个线程会进入阻塞状态,直到其他线程调用3次take(),和3个put(..)一一配对。

如果是公平模式(队列模式),则第1个调用put(..)的线程1会在队列头部,第1个到来的take()线程和它进行配对,遵循先到先配对的原则,所以是公平的;如果是非公平模式(栈模式),则第3个调用put(..)的线程3会在栈顶,第1个到来的take()线程和它进行配对,遵循的是后到先配对的原则,所以是非公平的。

公平模式与非公平模式对比

下面分别看一下TransferQueue和TransferStack的实现

TransferQueue

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    static final class TransferQueue<E> extends Transferer<E> {
        static final class QNode {
            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;
        }
        
        /** Head of queue */
        transient volatile QNode head;
        /** Tail of queue */
        transient volatile QNode tail;
        /**
         * Reference to a cancelled node that might not yet have been
         * unlinked from queue because it was the last inserted node
         * when it was cancelled.
         */
        transient volatile QNode cleanMe;
    }
}

从上面的代码可以看出,TransferQueue是一个基于单向链表而实现的队列,通过head和tail 2个指针记录头部和尾部。初始的时候,head和tail会指向一个空节点,构造函数如下所示。

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    static final class TransferQueue<E> extends Transferer<E> {
        TransferQueue() {
            QNode h = new QNode(null, false); // initialize to dummy node.
            head = h;
            tail = h;
        }
    }
}

下图所示为TransferQueue的工作原理。

  1. 阶段(a):队列中是一个空的节点,head/tail都指向这个空节点。
  2. 阶段(b):3个线程分别调用put,生成3个QNode,进入队列。
  3. 阶段(c):来了一个线程调用take,会和队列头部的第1个QNode进行配对。
  4. 阶段(d):第1个QNode出队列。

TransferQueue的工作原理

这里有一个关键点:put节点和take节点一旦相遇,就会配对出队列,所以在队列中不可能同时存在put节点和take节点,要么所有节点都是put节点,要么所有节点都是take节点。

接下来看一下TransferQueue的代码实现。

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    static final class TransferQueue<E> extends Transferer<E> {
        /**
         * Puts or takes an item.
         */
        @SuppressWarnings("unchecked")
        E transfer(E e, boolean timed, long nanos) {
            /* Basic algorithm is to loop trying to take either of
             * two actions:
             *
             * 1. If queue apparently empty or holding same-mode nodes,
             *    try to add node to queue of waiters, wait to be
             *    fulfilled (or cancelled) and return matching item.
             *
             * 2. If queue apparently contains waiting items, and this
             *    call is of complementary mode, try to fulfill by CAS'ing
             *    item field of waiting node and dequeuing it, and then
             *    returning matching item.
             *
             * In each case, along the way, check for and try to help
             * advance head and tail on behalf of other stalled/slow
             * threads.
             *
             * The loop starts off with a null check guarding against
             * seeing uninitialized head or tail values. This never
             * happens in current SynchronousQueue, but could if
             * callers held non-volatile/final ref to the
             * transferer. The check is here anyway because it places
             * null checks at top of loop, which is usually faster
             * than having them implicitly interspersed.
             */

            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
                // 队列还未初始化;自旋等待
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                // 队列为空或者当前线程和队列中元素为同一种模式
                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next;
                    // 不一致读,重新执行for循环
                    if (t != tail)                  // inconsistent read
                        continue;
                    if (tn != null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    if (s == null)
                        // 新建一个节点
                        s = new QNode(e, isData);
                    // 新建的节点加入尾部
                    if (!t.casNext(null, s))        // failed to link in
                        continue;

                    // 后裔tail指针
                    advanceTail(t, s);              // swing tail and wait
                    // 进入阻塞状态
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

                    // 从阻塞中被唤醒,确定已经处于队列中的第一个元素
                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                            // complementary-mode
                    // 当前线程可以和队列中的第一个元素进行配对
                    
                    // 取队列中的第一个元素
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
                        // 不一致读,重新执行for循环
                        continue;                   // inconsistent read

                    Object x = m.item;
                    // 已经配对过
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // 尝试配对
                        // 已经配对过,出队列
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }

                    advanceHead(h, m);              // successfully fulfilled
                    // 唤醒队列中第一个元素对应的线程
                    LockSupport.unpark(m.waiter);
                    return (x != null) ? (E)x : e;
                }
            }
        }
    }
}

整个for 循环有两个大的if-else 分支,如果当前线程和队列中的元素是同一种模式(都是put节点或者take节点),则与当前线程对应的节点被加入队列尾部并且阻塞;如果不是同一种模式,则选取队列头部的第1个元素进行配对。

这里的配对就是m.casItem(x,e),把自己的item x换成对方的item e,如果CAS操作成功,则配对成功。如果是put节点,则isData=true,item!=null;如果是take节点,则isData=false,item=null。如果CAS操作不成功,则isData和item之间将不一致,也就是isData!=(x!=null),通过这个条件可以判断节点是否已经被匹配过了。

TransferStack

TransferStack的定义如下所示,首先,它也是一个单向链表。不同于队列,只需要head指针就能实现入栈和出栈操作。

链表中的节点有三种状态,REQUEST对应take节点,DATA对应put节点,二者配对之后,会生成一个FULFILLING节点,入栈,然后FULLING节点和被配对的节点一起出栈。

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    static final class TransferStack<E> extends Transferer<E> { 
        /** Node represents an unfulfilled consumer */
        static final int REQUEST    = 0;
        /** Node represents an unfulfilled producer */
        static final int DATA       = 1;
        /** Node is fulfilling another unfulfilled DATA or REQUEST */
        static final int FULFILLING = 2;
        
        static final class SNode {
            volatile SNode next;        // next node in stack
            volatile SNode match;       // the node matched to this
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;
        }
        
        /** The head (top) of the stack */
        volatile SNode head;
    }
}

如图所示为TransferStack的工作原理。

  1. 阶段(a):head指向NULL。不同于TransferQueue,这里没有空的头节点。
  2. 阶段(b):3个线程调用3次put,依次入栈。
  3. 阶段(c):线程4调用take,和栈顶的第1个元素配对,生成FULLFILLING节点,入栈。
  4. 阶段(d):栈顶的2个元素同时入栈。

TransferStack的工作原理

下面看一下具体的代码实现。

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    static final class TransferStack<E> extends Transferer<E> { 
        /**
         * Puts or takes an item.
         */
        @SuppressWarnings("unchecked")
        E transfer(E e, boolean timed, long nanos) {
            /*
             * Basic algorithm is to loop trying one of three actions:
             *
             * 1. If apparently empty or already containing nodes of same
             *    mode, try to push node on stack and wait for a match,
             *    returning it, or null if cancelled.
             *
             * 2. If apparently containing node of complementary mode,
             *    try to push a fulfilling node on to stack, match
             *    with corresponding waiting node, pop both from
             *    stack, and return matched item. The matching or
             *    unlinking might not actually be necessary because of
             *    other threads performing action 3:
             *
             * 3. If top of stack already holds another fulfilling node,
             *    help it out by doing its match and/or pop
             *    operations, and then continue. The code for helping
             *    is essentially the same as for fulfilling, except
             *    that it doesn't return the item.
             */

            SNode s = null; // constructed/reused as needed
            int mode = (e == null) ? REQUEST : DATA;

            for (;;) {
                SNode h = head;
                // 同一种模式
                if (h == null || h.mode == mode) {  // empty or same-mode
                    if (timed && nanos <= 0) {      // can't wait
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                    } else if (casHead(h, s = snode(s, e, h, mode))) { // 入栈
                        SNode m = awaitFulfill(s, timed, nanos); // 阻塞等待
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     // help s's fulfiller
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                } else if (!isFulfilling(h.mode)) { // 非同一种模式,待匹配
                    if (h.isCancelled())            // already cancelled
                        casHead(h, h.next);         // pop and retry
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        // 生成一个FULFILLING节点,入栈
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            if (m == null) {        // all waiters are gone
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            SNode mn = m.next;
                            if (m.tryMatch(s)) {
                                // 两个节点一起出栈
                                casHead(s, mn);     // pop both s and m
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } else {                            // 已经匹配过了,出栈
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // 配对,一起出栈
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }
    }
}

BlockingDeque

BlockingDeque定义了一个阻塞的双端队列接口,如下所示。

public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
    void putFirst(E e) throws InterruptedException;
    void putLast(E e) throws InterruptedException;
    E takeFirst() throws InterruptedException;
    E takeLast() throws InterruptedException;
    ...
}

可以看到,该接口在继承了BlockingQueue接口的同时,增加了对应的双端队列操作接口。该接口只有一个实现,就是LinkedBlockingDeque。

其核心数据结构如下所示,是一个双向链表。

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {    
    /** Doubly-linked list node class */
    static final class Node<E> {
        /**
         * The item, or null if this node has been removed.
         */
        E item;

        /**
         * One of:
         * - the real predecessor Node
         * - this Node, meaning the predecessor is tail
         * - null, meaning there is no predecessor
         */
        Node<E> prev;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head
         * - null, meaning there is no successor
         */
        Node<E> next;

        Node(E x) {
            item = x;
        }
    }

    /**
     * Pointer to first node.
     * Invariant: (first == null && last == null) ||
     *            (first.prev == null && first.item != null)
     */
    transient Node<E> first;

    /**
     * Pointer to last node.
     * Invariant: (first == null && last == null) ||
     *            (last.next == null && last.item != null)
     */
    transient Node<E> last;

    /** Number of items in the deque */
    private transient int count;

    /** Maximum number of items in the deque */
    private final int capacity;

    /** Main lock guarding all access */
    final ReentrantLock lock = new ReentrantLock();

    /** Condition for waiting takes */
    private final Condition notEmpty = lock.newCondition();

    /** Condition for waiting puts */
    private final Condition notFull = lock.newCondition();
}

对应的实现原理,和LinkedBlockingQueue基本一样,只是LinkedBlockingQueue是单向链表,而LinkedBlockingDeque是双向链表。

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkFirst()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
    
    public E takeLast() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkLast()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
    
    public void putFirst(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkFirst(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }
    
    public void putLast(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkLast(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }
}

CopyOnWrite

CopyOnWrite指在“写”的时候,不是直接“写”源数据,而是把数据拷贝一份进行修改,再通过悲观锁或者乐观锁的方式写回。那为什么不直接修改,而是要拷贝一份修改呢?这是为了在“读”的时候不加锁。下面通过几个案例来展现CopyOnWrite的应用。

CopyOnWriteArrayList

和ArrayList一样,CopyOnWriteArrayList的核心数据结构也是一个数组,代码如下。

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {    
    /** The lock protecting all mutators */
    final transient ReentrantLock lock = new ReentrantLock();

    /** The array, accessed only via getArray/setArray. */
    private transient volatile Object[] array;
}

下面是CopyOnArrayList的几个“读”函数:

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {    
    public E get(int index) {
        return get(getArray(), index);
    }
    
    public boolean isEmpty() {
        return size() == 0;
    }
    
    public boolean contains(Object o) {
        Object[] elements = getArray();
        return indexOf(o, elements, 0, elements.length) >= 0;
    }
    
    public int indexOf(Object o) {
        Object[] elements = getArray();
        return indexOf(o, elements, 0, elements.length);
    }
    
    public int indexOf(E e, int index) {
        Object[] elements = getArray();
        return indexOf(e, elements, index, elements.length);
    }
}

这些“读”函数都没有加锁,那么是如何保证“线程安全”呢?答案在“写”函数里面。

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {    
    public void add(int index, E element) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            if (index > len || index < 0)
                throw new IndexOutOfBoundsException("Index: "+index+
                                                    ", Size: "+len);
            Object[] newElements;
            int numMoved = len - index;
            if (numMoved == 0)
                newElements = Arrays.copyOf(elements, len + 1);
            else {
                newElements = new Object[len + 1];
                System.arraycopy(elements, 0, newElements, 0, index);
                System.arraycopy(elements, index, newElements, index + 1,
                                 numMoved);
            }
            newElements[index] = element;
            setArray(newElements);
        } finally {
            lock.unlock();
        }
    }
}

其他“写”函数,例如remove和add类似。

CopyOnWriteArraySet

CopyOnWriteArraySet 就是用Array 实现的一个Set,保证所有元素都不重复。其内部是封装的一个CopyOnWriteArrayList。

public class CopyOnWriteArraySet<E> extends AbstractSet<E>
        implements java.io.Serializable {
    private final CopyOnWriteArrayList<E> al;
    
    public CopyOnWriteArraySet() {
        al = new CopyOnWriteArrayList<E>();
    }
    
    public boolean add(E e) {
        return al.addIfAbsent(e);
    }
}

ConcurrentLinkedQueue/Deque

前面详细分析了AQS内部的阻塞队列实现原理:基于双向链表,通过对head/tail进行CAS操作,实现入队和出队。

ConcurrentLinkedQueue 的实现原理和AQS 内部的阻塞队列类似:同样是基于CAS,同样是通过head/tail指针记录队列头部和尾部,但还是有稍许差别。

首先,它是一个单向链表,定义如下。

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
    }
    
    private transient volatile Node<E> head;
    
    private transient volatile Node<E> tail;
}

其次,在AQS的阻塞队列中,每次入队后,tail一定后移一个位置;每次出队,head一定前移一个位置,以保证head指向队列头部,tail指向链表尾部。

但在ConcurrentLinkedQueue中,head/tail的更新可能落后于节点的入队和出队,因为它不是直接对head/tail指针进行CAS操作的,而是对Node中的item进行操作。下面进行详细分析:

  1. 初始化

    初始的时候,如图所示,head和tail都执行一个NULL节点。对应的代码如下。

    public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
            implements Queue<E>, java.io.Serializable {
        public ConcurrentLinkedQueue() {
            head = tail = new Node<E>(null);
        }
    }
    

    初始状态

  2. 入队列

    代码如下所示。

    public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
            implements Queue<E>, java.io.Serializable {
        public boolean offer(E e) {
            checkNotNull(e);
            final Node<E> newNode = new Node<E>(e);
    
            for (Node<E> t = tail, p = t;;) {
                Node<E> q = p.next;
                if (q == null) {
                    // p is last node
                    // 关键点:是对tail的next指针而不是对tail指针执行CAS操作
                    if (p.casNext(null, newNode)) {
                        // Successful CAS is the linearization point
                        // for e to become an element of this queue,
                        // and for newNode to become "live".
                        if (p != t) // hop two nodes at a time
                            casTail(t, newNode);  // Failure is OK.
                        return true;
                    }
                    // Lost CAS race to another thread; re-read next
                }
                else if (p == q)
                    // We have fallen off list.  If tail is unchanged, it
                    // will also be off-list, in which case we need to
                    // jump to head, from which all live nodes are always
                    // reachable.  Else the new tail is a better bet.
                    // 已经到达队列尾部
                    p = (t != (t = tail)) ? t : head;
                else
                    // Check for tail updates after two hops.
                    // 后移p指针
                    p = (p != t && t != (t = tail)) ? t : q;
            }
        }
    }
    

    上面的入队其实是每次在队尾追加2个节点时,才移动一次tail节点,具体过程如图所示。

    线程1入队item2节点

    初始的时候,队列中有1个节点item1,tail指向该节点,假设线程1要入队item2节点:

    1. step1:p=tail,q=p.next=NULL;

    2. step2:对p的next执行CAS操作,追加item2,成功之后,p=tail。所以上面的casTail函数不会执行,直接返回。此时tail指针没有变化。

      之后,假设线程2要入队item3节点,如图5-7所示。

      线程2入队item3节点

    3. step3:p=tail,q=p.next.

    4. step4:q!=NULL,因此不会入队新节点。p,q都后移1位。

    5. step5:q=NULL,对p的next执行CAS操作,入队item3节点。

    6. step6:p!=t,满足条件,执行上面的casTail操作,tail后移2个位置,到达队列尾部。

    最后总结一下入队列的两个关键点:

    1. 即使tail指针没有移动,只要对p的next指针成功进行CAS操作,就算成功入队列。
    2. 只有当p!=tail的时候,才会后移tail指针。也就是说,每连续追加2个节点,才后移1次tail指针。即使CAS失败也没关系,可以由下1个线程来移动tail指针。
  3. 出队列

    上面说了入队列之后,tail指针不变化,那是否会出现入队列之后,要出队列却没有元素可出的情况呢?

    public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
            implements Queue<E>, java.io.Serializable {
        public E poll() {
            restartFromHead:
            for (;;) {
                for (Node<E> h = head, p = h, q;;) {
                    E item = p.item;
    
                    // 关键点:在出队列的时候,并没有移动head指针,而是把item设置为NULL
                    if (item != null && p.casItem(item, null)) {
                        // Successful CAS is the linearization point
                        // for item to be removed from this queue.
                        // 每产生2个NULL节点,才把head指针后移2位
                        if (p != h) // hop two nodes at a time
                            updateHead(h, ((q = p.next) != null) ? q : p);
                        return item;
                    }
                    else if ((q = p.next) == null) {
                        updateHead(h, p);
                        return null;
                    }
                    else if (p == q)
                        continue restartFromHead;
                    else
                        p = q;
                }
            }
        }
    }
    

    出队列的代码和入队列类似,也有p、q2个指针,整个变化过程如图5-8所示。假设初始的时候head指向空节点,队列中有item1、item2、item3三个节点。

    1. step1:p=head,q=p.next.p!=q.
    2. step2:后移p指针,使得p=q。
    3. step3:出队列。关键点:此处并没有直接删除item1节点,只是把该节点的item通过CAS操作置为了NULL。
    4. step4:p!=head,此时队列中有了2个NULL 节点,再前移1次head指针,对其执行updateHead操作。

    线程3出队列的过程

    最后总结一下出队列的关键点:

    1. 出队列的判断并非观察tail 指针的位置,而是依赖于head指针后续的节点是否为NULL这一条件。
    2. 只要对节点的item 执行CAS操作,置为NULL成功,则出队列成功。即使head指针没有成功移动,也可以由下1个线程继续完成。
  4. 队列判空

    因为head/tail 并不是精确地指向队列头部和尾部,所以不能简单地通过比较head/tail 指针来判断队列是否为空,而是需要从head指针开始遍历,找第1个不为NULL的节点。如果找到,则队列不为空;如果找不到,则队列为空。代码如下所示。

    public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
            implements Queue<E>, java.io.Serializable {
        public boolean isEmpty() {
            return first() == null;
        }
    
        Node<E> first() {
            restartFromHead:
            for (;;) {
                for (Node<E> h = head, p = h, q;;) {
                    boolean hasItem = (p.item != null);
                    if (hasItem || (q = p.next) == null) {
                        updateHead(h, p);
                        return hasItem ? p : null;
                    }
                    else if (p == q)
                        continue restartFromHead;
                    else
                        p = q;
                }
            }
        }
    }
    

ConcurrentHashMap

HashMap通常的实现方式是“数组+链表”,这种方式被称为“拉链法”。ConcurrentHashMap在这个基本原理之上进行了各种优化,在JDK 7和JDK 8中的实现方式有很大差异,下面分开讨论。

JDK 7中的实现方式(TODO)

为了提高并发度,在JDK7中,一个HashMap被拆分为多个子HashMap。每一个子HashMap称作一个Segment,多个线程操作多个Segment相互独立,如图所示。

JDK 7中ConcurrentHashMap数据结构示意图

TODO

JDK 8中的实现方式(TODO)

TODO

ConcurrentSkipListMap/Set

ConcurrentHashMap 是一种key 无序的HashMap,HashMap 则是key有序的,实现了NavigableMap接口,此接口又继承了SortedMap接口。

ConcurrentSkipListMap

为什么要使用SkipList实现Map?

在Java的util包中,有一个非线程安全的HashMap,也就是TreeMap,是key有序的,基于红黑树实现。

而在Concurrent包中,提供的key有序的HashMap,也就是ConcurrentSkipListMap,是基于SkipList(跳查表)来实现的。这里为什么不用红黑树,而用跳查表来实现呢?

借用Doug Lea的原话:也就是目前计算机领域还未找到一种高效的、作用在树上的、无锁的、增加和删除节点的办法。

那为什么SkipList可以无锁地实现节点的增加、删除呢?这要从无锁链表的实现说起。

无锁链表

Doug Lea在注释中引用了一篇无锁链表的论文:A pragmaticimplementation of non-blocking linked lists。

表面上看,无锁链表是很简单的,根本不需要写一篇论文来专门论述。在上文中讲解AQS时,曾反复用到无锁队列,其实现也是链表。究竟二者的区别在哪呢?

上文所讲的无锁队列、栈,都是只在队头、队尾进行CAS操作,通常不会有问题。如果在链表的中间进行插入或删除操作,按照通常的CAS做法,就会出现问题!

关于这个问题,Doug Lea的论文中有清晰的论述,此处引用如下:

操作1:在节点10后面插入节点20。如图5-14所示,首先把节点20的next指针指向节点30,然后对节点10的next指针执行CAS操作,使其指向节点20即可。

在节点10和节点30之间插入节点20

操作2:删除节点10。如图5-15所示,只需把头节点的next指针,进行CAS操作到节点30即可。

删除节点10

但是,如果两个线程同时操作,一个删除节点10,一个要在节点10后面插入节点20。并且这两个操作都各自是CAS的,此时就会出现问题。如图所示,删除节点10,会同时把新插入的节点20也删除掉!这个问题超出了CAS的解决范围。

两个线程同时操作

为什么会出现这个问题呢?

究其原因:在删除节点10的时候,实际受到操作的是节点10的前驱,也就是头节点。节点10本身没有任何变化。故而,再往节点10后插入节点20的线程,并不知道节点10已经被删除了!

针对这个问题,在论文中提出了如下的解决办法,如图5-17所示,把节点10的删除分为两2步:

  1. 第一步,把节点10的next指针,mark成删除,即软删除;
  2. 第二步,找机会,物理删除。

删除1个节点的两个步骤

做标记之后,当线程再往节点10后面插入节点20的时候,便可以先进行判断,节点10是否已经被删除,从而避免在一个删除的节点10后面插入节点20。这个解决方法有一个关键点:“把节点10的next指针指向节点20(插入操作)”和“判断节点10本身是否已经删除(判断操作)”,必须是原子的,必须在1个CAS操作里面完成!

具体的实现有两个办法:

  1. 办法一:AtomicMarkableReference:保证每个next 是AtomicMarkableReference 类型。但这个办法不够高效,Doug Lea 在ConcurrentSkipListMap的实现中用了另一种办法。
  2. 办法2:Mark节点:我们的目的是标记节点10已经删除,也就是标记它的next字段。那么可以新造一个marker节点,使节点10的next指针指向该Marker节点。这样,当向节点10的后面插入节点20的时候,就可以在插入的同时判断节点10的next指针是否执行了一个Marker节点,这两个操作可以在一个CAS操作里面完成。

跳查表(TODO)

TODO

解决了无锁链表的插入或删除问题,也就解决了跳查表的一个关键问题。因为跳查表就是多层链表叠起来的。下面先看一下跳查表的数据结构(下面所用代码都引用自JDK7,JDK 8中的代码略有差异,但不影响下面的原理分析)。

ConcurrentSkipListSet

ConcurrentSkipListSet只是对ConcurrentSkipListMap的简单封装。