第3章 Lock与Condition

互斥锁

锁的可重入性

因为在Concurrent包中的锁都是“可重入锁”,所以一般都命名为ReentrantX,因为所有的锁。“可重入锁”是指当一个线程调用object.lock()拿到锁,进入互斥区后,再次调用object.lock(),仍然可以拿到该锁。很显然,通常的锁都要设计成可重入的,否则就会发生死锁。

synchronized关键字,同样是可重入锁。考虑下面的典型场景:在一个synchronized函数f1()里面调用另外一个synchronized函数f2()。如果synchronized关键字不可重入,那么在f2()处就会发生阻塞,这显然不可行。

类的继承层次

互斥锁(ReentrantLock)相关类之间的继承层次,如图所示。

I表示界面(Interface),A表示抽象类(AbstractClass),C表示类(Class),$表示内部类。实线表示继承关系,虚线表示引用关系。

与ReentrantLock相关类之间的继承层次

Lock是一个接口,其定义如下:

public interface Lock {
	void lock();
	void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

常用的方法是lock()/unlock()。lock()不能被中断,对应的lockInterruptibly()可以被中断。

ReentrantLock本身没有代码逻辑,实现都在其内部类Sync中。

public class ReentrantLock implements Lock, java.io.Serializable {
	/** Synchronizer providing all implementation mechanics */
    private final Sync sync;
    
    public void lock() {
        sync.lock();
    }
    
    public void unlock() {
        sync.release(1);
    }
}

锁的公平性与非公平性

Sync是一个抽象类,它有两个子类FairSync与NonfairSync,分别对应公平锁和非公平锁。从下面的ReentrantLock构造函数可以看出,会传入一个布尔类型的变量fair指定锁是公平的还是非公平的,默认为非公平的。

	/**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

	/**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

么叫公平锁和非公平锁呢?先举个现实生活中的例子,一个人去火车站售票窗口买票,发现现场有人排队,于是他排在队伍末尾,遵循先到者优先服务的规则,这叫公平;如果他去了不排队,直接冲到窗口买票,这叫作不公平。

对应到锁的例子,一个新的线程来了之后,看到有很多线程在排队,自己排到队伍末尾,这叫公平;线程来了之后直接去抢锁,这叫作不公平。不同于现实生活,这里默认设置的是非公平锁,其实是为了提高效率,减少线程切换。

锁实现的基本原理

Sync的父类AbstractQueuedSynchronizer经常被称作队列同步器(AQS),这个类非常关键,下面会反复提到,该类的父类是AbstractOwnableSynchronizer。

Atomic类都是“自旋”性质的锁,而本章讲的锁将具备synchronized功能,也就是可以阻塞一个线程。为了实现一把具有阻塞或唤醒功能的锁,需要几个核心要素:

  1. ① 需要一个state变量,标记该锁的状态。state变量至少有两个值:0、1。对state变量的操作,要确保线程安全,也就是会用到CAS。
  2. ② 需要记录当前是哪个线程持有锁。
  3. ③ 需要底层支持对一个线程进行阻塞或唤醒操作。
  4. ④ 需要有一个队列维护所有阻塞的线程。这个队列也必须是线程安全的无锁队列,也需要用到CAS。

针对要素①②,在上面两个类中有对应的体现:

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    /**
     * The current owner of exclusive mode synchronization.
     */
    private transient Thread exclusiveOwnerThread;
}
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    /**
     * The synchronization state.
     */
    private volatile int state;
}

state取值不仅可以是0、1,还可以大于1,就是为了支持锁的可重入性。例如,同样一个线程,调用5次lock,state会变成5;然后调用5次unlock,state减为0。

当state=0时,没有线程持有锁,exclusiveOwnerThread=null;

当state=1时,有一个线程持有锁,exclusiveOwnerThread=该线程;

当state>1时,说明该线程重入了该锁。

针对要素③,在Unsafe类中,提供了阻塞或唤醒线程的一对操作原语,也就是park/unpark。

	public native void unpark(Object var1);
    public native void park(boolean var1, long var2);

有一个LockSupport的工具类,对这一对原语做了简单封装:

public class LockSupport {
	public static void park() {
        UNSAFE.park(false, 0L);
    }
    
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }
}

在当前线程中调用park(),该线程就会被阻塞;在另外一个线程中,调用unpark(Thread t),传入一个被阻塞的线程,就可以唤醒阻塞在park()地方的线程。

**尤其是unpark(Thread t),它实现了一个线程对另外一个线程的“精准唤醒”。**前面讲到的wait()/notify(),notify也只是唤醒某一个线程,但无法指定具体唤醒哪个线程。

针对要素④,在AQS中利用双向链表和CAS实现了一个阻塞队列。如下所示:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    static final class Node {
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
    }
    
    private transient volatile Node head;
    private transient volatile Node tail;
}

阻塞队列是整个AQS核心中的核心,下面做进一步的阐述。如图所示,head指向双向链表头部,tail指向双向链表尾部。入队就是把新的Node加到tail后面,然后对tail进行CAS操作;出队就是对head进行CAS操作,把head向后移一个位置。

阻塞队列的示意图

初始的时候,head=tail=NULL;然后,在往队列中加入阻塞的线程时,会新建一个空的Node,让head和tail都指向这个空Node;之后,在后面加入被阻塞的线程对象。所以,当head=tail的时候,说明队列为空。

公平与非公平的lock()的实现差异

下面分析基于AQS,ReentrantLock在公平性和非公平性上的实现差异。

	/**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
	/**
     * Sync object for fair locks
     */
	static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

acquire()是AQS的一个模板方法,如下所示。

	**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire(..)是一个虚函数,也就是再次尝试拿锁,被NonfairSync与FairSync分别实现。acquireQueued(..)函数的目的是把线程放入阻塞队列,然后阻塞该线程。

来看FairSync与NonfairSync的tryAcquire(1)有什么区别。

	abstract static class Sync extends AbstractQueuedSynchronizer {
		**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
	}
	/**
     * Sync object for fair locks
     */
	static final class FairSync extends Sync {
        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

这两段代码非常相似,唯一的区别是第二段代码多了一个if (!hasQueuedPredecessors())。什么意思呢?就是只有当c==0(没有线程持有锁),并且排在队列的第1个时(即当队列中没有其他线程的时候),才去抢锁,否则继续排队,这才叫“公平”!

阻塞队列与唤醒机制

下面进入锁的最为关键的部分,即acquireQueued(..)函数内部一探究竟。

	/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

先说addWaiter(..)函数,就是为当前线程生成一个Node,然后把Node放入双向链表的尾部。要注意的是,这只是把Thread对象放入了一个队列中而已,线程本身并未阻塞。

	/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

在addWaiter(..)函数把Thread对象加入阻塞队列之后的工作就要靠acquireQueued(..)函数完成。线程一旦进入acquireQueued(..)就会被无限期阻塞,即使有其他线程调用interrupt()函数也不能将其唤醒,除非有其他线程释放了锁,并且该线程拿到了锁,才会从accquireQueued(..)返回。

注意:进入acquireQueued(..),该线程被阻塞。在该函数返回的一刻,就是拿到锁的那一刻,也就是被唤醒的那一刻,此时会删除队列的第一个元素(head指针前移1个节点)。

	/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

首先,acquireQueued(..)函数有一个返回值,表示什么意思呢?虽然该函数不会中断响应,但它会记录被阻塞期间有没有其他线程向它发送过中断信号。如果有,则该函数会返回true;否则,返回false。

基于这个返回值,才有了下面的代码:

	public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
	
	/**
     * Convenience method to interrupt current thread.
     */
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

当acquireQueued(..)返回true 时,会调用selfInterrupt(),自己给自己发送中断信号,也就是自己把自己的中断标志位设为true。之所以要这么做,是因为自己在阻塞期间,收到其他线程中断信号没有及时响应,现在要进行补偿。这样一来,如果该线程在lock代码块内部有调用sleep()之类的阻塞方法,就可以抛出异常,响应该中断信号。

阻塞就发生在下面这个函数中:

	/**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

线程调用park()函数,自己把自己阻塞起来,直到被其他线程唤醒,该函数返回。park()函数返回有两种情况。

  • 情况1:其他线程调用了unpark(Thread t)。
  • 情况2:其他线程调用了t.interrupt()。这里要注意的是,lock()不能响应中断,但LockSupport.park()会响应中断。

也正因为LockSupport.park()可能被中断唤醒,acquireQueued(..)函数才写了一个for死循环。唤醒之后,如果发现自己排在队列头部,就去拿锁;如果拿不到锁,则再次自己阻塞自己。不断重复此过程,直到拿到锁。

被唤醒之后,通过Thread.interrupted()来判断是否被中断唤醒。如果是情况1,会返回false;如果是情况2,则返回true。

unlock()实现分析

说完了lock,下面分析unlock的实现。unlock不区分公平还是非公平。

/**
     * Attempts to release this lock.
     *
     * <p>If the current thread is the holder of this lock then the hold
     * count is decremented.  If the hold count is now zero then the lock
     * is released.  If the current thread is not the holder of this
     * lock then {@link IllegalMonitorStateException} is thrown.
     *
     * @throws IllegalMonitorStateException if the current thread does not
     *         hold this lock
     */
    public void unlock() {
        sync.release(1);
    }
	/**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
	protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
	/**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

release()里面做了两件事:tryRelease(..)函数释放锁;unparkSuccessor(..)函数唤醒队列中的后继者。

在上面的代码中有一个关键点要说明:因为是排他锁,只有已经持有锁的线程才有资格调用release(..),这意味着没有其他线程与它争抢。所以,在上面的tryRelease(..)函数中,对state值的修改,不需要CAS操作,直接减1即可。

注:但对于读写锁中的读锁,也就是releaseShared(..),就不一样了。

lockInterruptibly()实现分析

上面的lock 不能被中断,这里的lockInterruptibly()可以被中断,下面看一下两者在实现上有什么差别。

	public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
	/**
     * Acquires in exclusive mode, aborting if interrupted.
     * Implemented by first checking interrupt status, then invoking
     * at least once {@link #tryAcquire}, returning on
     * success.  Otherwise the thread is queued, possibly repeatedly
     * blocking and unblocking, invoking {@link #tryAcquire}
     * until success or the thread is interrupted.  This method can be
     * used to implement method {@link Lock#lockInterruptibly}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

这里的acquireInterruptibly(..)也是AQS 的模板方法,里面的tryAcquire(..)分别被FairSync和NonfairSync实现,此处不再重复叙述。这里主要讲doAcquireInterruptibly(..)函数。

	/**
     * Acquires in exclusive interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 关键位置。收到中断信号,不再阻塞,直接抛出异常然后返回
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

明白了accquireQueued(..)原理,此处就很简单了。当parkAndCheckInterrupt()返回true的时候,说明有其他线程发送中断信号,直接抛出InterruptedException,跳出for循环,整个函数返回。

tryLock()实现分析

	/**
     * Acquires the lock only if it is not held by another thread at the time
     * of invocation.
     *
     * <p>Acquires the lock if it is not held by another thread and
     * returns immediately with the value {@code true}, setting the
     * lock hold count to one. Even when this lock has been set to use a
     * fair ordering policy, a call to {@code tryLock()} <em>will</em>
     * immediately acquire the lock if it is available, whether or not
     * other threads are currently waiting for the lock.
     * This &quot;barging&quot; behavior can be useful in certain
     * circumstances, even though it breaks fairness. If you want to honor
     * the fairness setting for this lock, then use
     * {@link #tryLock(long, TimeUnit) tryLock(0, TimeUnit.SECONDS) }
     * which is almost equivalent (it also detects interruption).
     *
     * <p>If the current thread already holds this lock then the hold
     * count is incremented by one and the method returns {@code true}.
     *
     * <p>If the lock is held by another thread then this method will return
     * immediately with the value {@code false}.
     *
     * @return {@code true} if the lock was free and was acquired by the
     *         current thread, or the lock was already held by the current
     *         thread; and {@code false} otherwise
     */
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

tryLock()实现基于调用非公平锁的tryAcquire(..),对state进行CAS操作,如果操作成功就拿到锁;如果操作不成功则直接返回false,也不阻塞。

读写锁

和互斥锁相比,读写锁(ReentrantReadWriteLock)就是读线程和读线程之间可以不用互斥了。在正式介绍原理之前,先看一下相关类的继承体系。

类继承层次

如图所示,ReadWriteLock是一个接口,内部由两个Lock接口组成。

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing
     */
    Lock writeLock();
}

ReentrantReadWriteLock类继承层次

ReentrantReadWriteLock实现了该接口,使用方式如下:

		ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        Lock rLock = readWriteLock.readLock();
        rLock.lock();
        try {
            
        } finally {
            rLock.unlock();
        }
        Lock wLock = readWriteLock.writeLock();
        wLock.lock();
        try {
            
        } finally {
            wLock.unlock();
        }

也就是说,当使用ReadWriteLock 的时候,并不是直接使用,而是获得其内部的读锁和写锁,然后分别调用lock/unlock。

读写锁实现的基本原理

从表面来看,ReadLock和WriteLock是两把锁,实际上它只是同一把锁的两个视图而已。什么叫两个视图呢?可以理解为是一把锁,线程分成两类:读线程和写线程。读线程和读线程之间不互斥(可以同时拿到这把锁),读线程和写线程互斥,写线程和写线程也互斥。

从下面的构造函数也可以看出,readerLock和writerLock实际共用同一个sync对象。sync对象同互斥锁一样,分为非公平和公平两种策略,并继承自AQS。

    /**
     * Creates a new {@code ReentrantReadWriteLock} with
     * default (nonfair) ordering properties.
     */
    public ReentrantReadWriteLock() {
        this(false);
    }

    /**
     * Creates a new {@code ReentrantReadWriteLock} with
     * the given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

同互斥锁一样,读写锁也是用state变量来表示锁状态的。只是state变量在这里的含义和互斥锁完全不同。在内部类Sync中,对state变量进行了重新定义,如下所示。

	/**
     * Synchronization implementation for ReentrantReadWriteLock.
     * Subclassed into fair and nonfair versions.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;

        /*
         * Read vs write count extraction constants and functions.
         * Lock state is logically divided into two unsigned shorts:
         * The lower one representing the exclusive (writer) lock hold count,
         * and the upper the shared (reader) hold count.
         */

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** Returns the number of shared holds represented in count  */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** Returns the number of exclusive holds represented in count  */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    }

也就是把state 变量拆成两半,低16位,用来记录写锁。但同一时间既然只能有一个线程写,为什么还需要16位呢?这是因为一个写线程可能多次重入。例如,低16位的值等于5,表示一个写线程重入了5次。

高16位,用来“读”锁。例如,高16位的值等于5,可以表示5个读线程都拿到了该锁;也可以表示一个读线程重入了5次。

这个地方的设计很巧妙,为什么要把一个int类型变量拆成两半,而不是用两个int型变量分别表示读锁和写锁的状态呢?这是因为无法用一次CAS 同时操作两个int变量,所以用了一个int型的高16位和低16位分别表示读锁和写锁的状态。

当state=0时,说明既没有线程持有读锁,也没有线程持有写锁;当state!=0时,要么有线程持有读锁,要么有线程持有写锁,两者不能同时成立,因为读和写互斥。这时再进一步通过sharedCount(state)和exclusiveCount(state)判断到底是读线程还是写线程持有了该锁。

AQS的两对模板方法

下面介绍在ReentrantReadWriteLock的两个内部类ReadLock和WriteLock中,是如何使用state变量的。

	public static class ReadLock implements Lock, java.io.Serializable {
    	public void lock() {
            sync.acquireShared(1);
        }
        
        public void unlock() {
            sync.releaseShared(1);
        }
	}
	public static class WriteLock implements Lock, java.io.Serializable {
    	public void lock() {
            sync.acquire(1);
        }
        
        public void unlock() {
            sync.release(1);
        }
    }

acquire/release、acquireShared/releaseShared 是AQS里面的两对模板方法。互斥锁和读写锁的写锁都是基于acquire/release模板方法来实现的。读写锁的读锁是基于acquireShared/releaseShared这对模板方法来实现的。这两对模板方法的代码如下:

	public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

	public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

	public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

	public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

将读/写、公平/非公平进行排列组合,就有4种组合。如图所示,上面的两个函数都是在Sync中实现的。Sync中的两个函数又是模板方法,在NonfairSync和FairSync中分别有实现。最终的对应关系如下:

  1. 读锁的公平实现:Sync.tryAccquireShared()+FairSync中的两个覆写的子函数。
  2. 读锁的非公平实现:Sync.tryAccquireShared()+NonfairSync中的两个覆写的子函数。
  3. 写锁的公平实现:Sync.tryAccquire()+FairSync中的两个覆写的子函数。
  4. 写锁的非公平实现:Sync.tryAccquire()+NonfairSync中的两个覆写的子函数。

四种锁的策略的实现示意图

    /**
     * Nonfair version of Sync
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        final boolean readerShouldBlock() {
            /* As a heuristic to avoid indefinite writer starvation,
             * block if the thread that momentarily appears to be head
             * of queue, if one exists, is a waiting writer.  This is
             * only a probabilistic effect since a new reader will not
             * block if there is a waiting writer behind other enabled
             * readers that have not yet drained from the queue.
             */
            return apparentlyFirstQueuedIsExclusive();
        }
    }

    /**
     * Fair version of Sync
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }

上面的代码介绍了ReentrantReadWriteLock里面的NonfairSync和FairSync的实现过程,对应了上面的四种实现策略,下面分别解释。

对于公平,比较容易理解,不论是读锁,还是写锁,只要队列中有其他线程在排队(排队等读锁,或者排队等写锁),就不能直接去抢锁,要排在队列尾部。

对于非公平,读锁和写锁的实现策略略有差异。先说写锁,写线程能抢锁,前提是state=0,只有在没有其他线程持有读锁或写锁的情况下,它才有机会去抢锁。或者state!=0,但那个持有写锁的线程是它自己,再次重入。写线程是非公平的,就是不管三七二十一就去抢,即一直返回false。

但对于读线程,能否也不管三七二十一,上来就去抢呢?不行!因为读线程和读线程是不互斥的,假设当前线程被读线程持有,然后其他读线程还非公平地一直去抢,可能导致写线程永远拿不到锁,所以对于读线程的非公平,要做一些“约束”。当发现队列的第1个元素是写线程的时候,读线程也要阻塞一下,不能“肆无忌惮”地直接去抢。

明白策略后,下面具体介绍四种实现方面的差异。

WriteLock公平与非公平实现

写锁是排他锁,实现策略类似于互斥锁,重写了tryAcquire/tryRelease方法。

tryAcquire()实现分析

		protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            // 写线程只能有一个,但写线程可以多次重入
            int w = exclusiveCount(c);
            // c != 0 说明有读线程或者写线程持有锁
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                // w == 0,说明锁被读线程持有,只能返回
                // w != 0 且持有写锁的不是自己,也只能返回
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                // 16位用满了,超过了最大重入次数
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() || // writerShouldBlock就是四种不同的实现策略
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

把上面的代码拆开进行分析,如下:

  1. if (c != 0) and w == 0,说明当前一定是读线程拿着锁,写锁一定拿不到,返回false。

  2. if (c != 0) and w != 0,说明当前一定是写线程拿着锁,执行current != getExclusive-OwnerThread() 的判断,发现ownerThread不是自己,返回false。

  3. c != 0,w != 0,且current = getExclusiveOwnerThread(),才会走到if(w+exclusive-Count(acquires)> MAX_COUNT)。判断重入次数,重入次数超过最大值,抛出异常。因为是用state的低16位保存写锁重入次数的,所以MAX_COUNT是216。如果超出这个值,会写到读锁的高16位上。为了避免这种情形,这里做了一个检测。当然,一般不可能重入这么多次。

  4. if(c=0),说明当前既没有读线程,也没有写线程持有该锁。可以通过CAS操作开抢了。

    		if (writerShouldBlock() || // writerShouldBlock就是四种不同的实现策略
                    !compareAndSetState(c, c + acquires))
    

    抢成功后,调用setExclusiveOwnerThread(current),把ownerThread设成自己。

公平实现和非公平实现几乎一模一样,只是writerShouldBlock()分别被FairSync 和NonfairSync实现,在上一节已讲。

tryRelease(..)实现分析

		protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            // 因为写锁是排他的,在当前线程持有写锁的时候,其他线程既不会持有写锁,
            // 也不会持有读锁。所以这里对state的值的改变不需要CAS操作,直接减即可。
            setState(nextc);
            return free;
        }

ReadLock公平与非公平实现

读锁是共享锁,重写了tryAcquireShared/tryReleaseShared方法,其实现策略和排他锁有很大的差异。

tryAcquireShared(..)实现分析

		protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            // 写锁被持有且不是自己,则读锁肯定拿不到,直接返回
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() && // 公平和非公平的差异就在于这个函数
                r < MAX_COUNT &&
                // CAS拿读锁,高16位+1
                compareAndSetState(c, c + SHARED_UNIT)) {              
                if (r == 0) { // r之前等于0,说明自己是第一个拿到读锁的线程
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) { // 说明自己不是第一个拿到读锁的线程
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

代码分析如下:

  1. 	if (exclusiveCount(c) != 0 &&
                  getExclusiveOwnerThread() != current)
                  return -1;
    

    低16位不等于0,说明有写线程持有锁,并且只有当ownerThread!=自己时,才返回-1。这里面有一个潜台词:如果current=ownerThread,则这段代码不会返回。这是因为一个写线程可以再次去拿读锁!也就是说,一个线程在持有了WriteLock后,再去调用ReadLock.lock也是可以的。

  2. 上面的compareAndSetState(c,c+SHARED_UNIT),其实是把state的高16位加1(读锁的状态),但因为是在高16位,必须把1左移16位再加1。

  3. firstReader,cachedHoldConunter 之类的变量,只是一些统计变量,在ReentrantRead-WriteLock对外的一些查询函数中会用到,例如,查询持有读锁的线程列表,但对整个读写互斥机制没有影响,此处不再展开解释。

tryReleaseShared(..)实现分析

		protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            // 统计变量
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            // 通过一个for循环+CAS操作不断重试
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

因为读锁是共享锁,多个线程会同时持有读锁,所以对读锁的释放不能直接减1,而是需要通过一个for循环+CAS操作不断重试。这是tryReleaseShared和tryRelease的根本差异所在。

Condition

Condition与Lock的关系

Condition本身也是一个接口,其功能和wait/notify类似,如下所示。

public interface Condition {
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    
    void signal();
    void signalAll();
}

在讲多线程基础的时候,强调wait()/notify()必须和synchronized一起使用,Condition也是如此,必须和Lock一起使用。因此,在Lock的接口中,有一个与Condition相关的接口:

public interface Lock {
    Condition newCondition();
}

Condition的使用场景

在讲Condition的实现原理之前,先以ArrayBlockingQueue的实现为例,介绍Condition的使用场景。如下所示为一个用数组实现的阻塞队列,执行put(..)操作的时候,队列满了,生成者线程被阻塞;执行take()操作的时候,队列为空,消费者线程被阻塞。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /** The queued items */
    final Object[] items;
    
    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
    
    /**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and the specified access policy.
     *
     * @param capacity the capacity of this queue
     * @param fair if {@code true} then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if {@code false} the access order is unspecified.
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    
    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                // put的时候,队列满了,阻塞与“非满”条件
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                // take的时候,队列为空,阻塞在“非空”条件
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
}

Condition实现原理

可以发现,Condition的使用很简洁,避免了wait/notify 的生成者通知生成者、消费者通知消费者的问题。

因为Condition必须和Lock一起使用,所以Condition的实现也是Lock的一部分。下面先分别看一下互斥锁和读写锁中Condition的构造。

首先,读写锁中的ReadLock 是不支持Condition 的,读写锁的写锁和互斥锁都支持Condition。虽然它们各自调用的是自己的内部类Sync,但内部类Sync都继承自AQS。因此,上面的代码sync.newCondition最终都调用了AQS中的new ConditionObject()。

每一个Condition对象上面,都阻塞了多个线程。因此,在ConditionObject内部也有一个双向链表组成的队列,如下所示。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	public class ConditionObject implements Condition, java.io.Serializable {
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
        
    	public ConditionObject() { }
    }
}

await()实现分析

		/**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            // 正要执行await()操作,收到中断信号,抛出异常
            if (Thread.interrupted())
                throw new InterruptedException();
            // 加入Condition的等待队列
            Node node = addConditionWaiter();
            // 关键点;阻塞在Condition之前必须先释放锁,否则会死锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                // 自己阻塞自己
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 重新拿锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                // 被中断唤醒,向外抛出中断异常
                reportInterruptAfterWait(interruptMode);
        }

关于await,有几个关键点要说明:

  1. 线程调用await()的时候,肯定已经先拿到了锁。所以,在addConditionWaiter()内部,对这个双向链表的操作不需要执行CAS操作,线程天生是安全的,代码如下。

    		/**
             * Adds a new waiter to wait queue.
             * @return its new wait node
             */
            private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    
  2. 在线程执行wait操作之前,必须先释放锁。也就是fullyRelease(node),否则会发生死锁。这个和wait/notify与synchronized的配合机制一样。

  3. 线程从wait中被唤醒后,必须用acquireQueued(node,savedState)函数重新拿锁。

  4. checkInterruptWhileWaiting(node)代码在park(this)代码之后,是为了检测在park期间是否收到过中断信号。当线程从park中醒来时,有两种可能:一种是其他线程调用了unpark,另一种是收到中断信号。这里的await()函数是可以响应中断的,所以当发现自己是被中断唤醒的,而不是被unpark唤醒的时,会直接退出while循环,await()函数也会返回。

  5. isOnSyncQueue(node)用于判断该Node是否在AQS的同步队列里面。初始的时候,Node只在Condition的队列里,而不在AQS的队列里。但执行notity操作的时候,会放进AQS的同步队列。

awaitUninterruptibly()实现分析

与await()不同,awaitUninterruptibly()不会响应中断,其函数的定义中不会有中断异常抛出,下面分析其实现和await()的区别。

		/**
         * Implements uninterruptible condition wait.
         * <ol>
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * </ol>
         */
        public final void awaitUninterruptibly() {
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if (Thread.interrupted())
                    interrupted = true;
            }
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        }

可以看出,整体代码和await()类似,区别在于收到异常后,不会抛出异常,而是继续执行while循环。

notify()实现分析

		/**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

		/**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
	/**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

同await()一样,在调用notify()的时候,必须先拿到锁(否则就会抛出上面的异常),是因为前面执行await()的时候,把锁释放了。

然后,从队列中取出firstWait,唤醒它。在通过调用unpark唤醒它之前,先用enq(node)函数把这个Node放入AQS的锁对应的阻塞队列中。也正因为如此,才有了await()函数里面的判断条件while (!isOnSyncQueue(node)),这个判断条件被满足,说明await线程不是被中断,而是被unpark唤醒的。

知道了notify()实现原理,notifyAll()与此类似,此处不再赘述。

StampedLock

为什么引入StampedLock

在JDK 8中新增了StampedLock,有了读写锁,为什么还要引入StampedLock呢?来看一下对比。

三种锁的并发度的对比

可以看到,从ReentrantLock到StampedLock,并发度依次提高。StampedLock是如何做到“读”与“写”也不互斥、并发地访问的呢?在《软件架构设计:大型网站技术架构与业务架构融合之道》中,谈到MySQL 高并发的核心机制MVCC,也就是一份数据多个版本,此处的StampedLock有异曲同工之妙。

另一方面,因为ReentrantLock采用的是“悲观读”的策略,当第一个读线程拿到锁之后,第二个、第三个读线程还可以拿到锁,使得写线程一直拿不到锁,可能导致写线程“饿死”。虽然在其公平或非公平的实现中,都尽量避免这种情形,但还有可能发生。StampedLock引入了“乐观读”策略,读的时候不加读锁,读出来发现数据被修改了,再升级为“悲观读”,相当于降低了“读”的地位,把抢锁的天平往“写”的一方倾斜了一下,避免写线程被饿死。

使用场景

先以官方的一个例子来看一下StampedLock如何使用。

image-20220310155931626

如上面代码所示,有一个Point类,多个线程调用move()函数,修改坐标;还有多个线程调用distanceFromOrigin()函数,求距离。

首先,执行move操作的时候,要加写锁。这个用法和ReadWriteLock的用法没有区别,写操作和写操作也是互斥的。关键在于读的时候,用了一个“乐观读”sl.tryOptimisticRead(),相当于在读之前给数据的状态做了一个“快照”。然后,把数据拷贝到内存里面,在用之前,再比对一次版本号。如果版本号变了,则说明在读的期间有其他线程修改了数据。读出来的数据废弃,重新获取读锁。关键代码就是下面这三行:

image-20220310160117932

要说明的是,这三行关键代码对顺序非常敏感,不能有重排序。因为state 变量已经是volatile,所以可以禁止重排序,但stamp并不是volatile的。为此,在validate(stamp)函数里面插入内存屏障。

“乐观读”的实现原理

首先,StampedLock是一个读写锁,因此也会像读写锁那样,把一个state变量分成两半,分别表示读锁和写锁的状态。同时,它还需要一个数据的version。但正如前面所说,一次CAS没有办法操作两个变量,所以这个state变量本身同时也表示了数据的version。下面先分析state变量。

public class StampedLock implements java.io.Serializable {
    // Values for lock state and stamp operations
    private static final long RUNIT = 1L;
    private static final long WBIT  = 1L << LG_READERS;
    private static final long RBITS = WBIT - 1L;
    private static final long RFULL = RBITS - 1L;
    private static final long ABITS = RBITS | WBIT;
    private static final long SBITS = ~RBITS; // note overlap with ABITS

    // Initial value for lock state; avoid failure value zero
    private static final long ORIGIN = WBIT << 1;
    
    /** Lock sequence/state */
    private transient volatile long state;
}

结合代码和图:用最低的8位表示读和写的状态,其中第8位表示写锁的状态,最低的7位表示读锁的状态。因为写锁只有一个bit位,所以写锁是不可重入的。

state变量在不同状态下的取值

初始值不为0,而是把WBIT 向左移动了一位,也就是上面的ORIGIN 常量,构造函数如下所示。

	/**
     * Creates a new lock, initially in unlocked state.
     */
    public StampedLock() {
        state = ORIGIN;
    }

为什么state的初始值不设为0呢?这就要从乐观锁的实现说起。

	/**
     * Returns a stamp that can later be validated, or zero
     * if exclusively locked.
     *
     * @return a stamp, or zero if exclusively locked
     */
    public long tryOptimisticRead() {
        long s;
        return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
    }

	/**
     * Returns true if the lock has not been exclusively acquired
     * since issuance of the given stamp. Always returns false if the
     * stamp is zero. Always returns true if the stamp represents a
     * currently held lock. Invoking this method with a value not
     * obtained from {@link #tryOptimisticRead} or a locking method
     * for this lock has no defined effect or result.
     *
     * @param stamp a stamp
     * @return {@code true} if the lock has not been exclusively acquired
     * since issuance of the given stamp; else false
     */
    public boolean validate(long stamp) {
        U.loadFence();
        return (stamp & SBITS) == (state & SBITS);
    }

上面两个函数必须结合起来看:当state&WBIT!=0的时候,说明有线程持有写锁,上面的tryOptimisticRead会永远返回0。这样,再调用validate(stamp),也就是validate(0)也会永远返回false。这正是我们想要的逻辑:当有线程持有写锁的时候,validate永远返回false,无论写线程是否释放了写锁。因为无论是否释放了(state回到初始值)写锁,state值都不为0,所以validate(0)永远为false。

为什么上面的validate(..)函数不直接比较stamp=state,而要比较state&SBITS=state&SBITS 呢?因为读锁和读锁是不互斥的!所以,即使在“乐观读”的时候,state 值被修改了,但如果它改的是第7位,validate(..)还是会返回true。

另外要说明的一点是,上面使用了内存屏障U.loadFence(),是因为在这行代码的下一行里面的stamp、SBITS变量不是volatile的,由此可以禁止其和前面的currentX=X,currentY=Y进行重排序。

通过上面的分析,可以发现state的设计非常巧妙。只通过一个变量,既实现了读锁、写锁的状态记录,还实现了数据的版本号的记录。

悲观读/写:“阻塞”与“自旋”策略实现差异

同ReadWriteLock一样,StampedLock也要进行悲观的读锁和写锁操作。不过,它不是基于AQS实现的,而是内部重新实现了一个阻塞队列。如下所示。

public class StampedLock implements java.io.Serializable {
    /** Wait nodes */
    static final class WNode {
        volatile WNode prev;
        volatile WNode next;
        volatile WNode cowait;    // list of linked readers
        volatile Thread thread;   // non-null while possibly parked
        volatile int status;      // 0, WAITING, or CANCELLED
        final int mode;           // RMODE or WMODE
        WNode(int m, WNode p) { mode = m; prev = p; }
    }

    /** Head of CLH queue */
    private transient volatile WNode whead;
    /** Tail (last) of CLH queue */
    private transient volatile WNode wtail;
}

这个阻塞队列和AQS 里面的很像。刚开始的时候,whead=wtail=NULL,然后初始化,建一个空节点,whead和wtail都指向这个空节点,之后往里面加入一个个读线程或写线程节点。但基于这个阻塞队列实现的锁的调度策略和AQS很不一样,也就是“自旋”。在AQS里面,当一个线程CAS state失败之后,会立即加入阻塞队列,并且进入阻塞状态。但在StampedLock中,CAS state失败之后,会不断自旋,自旋足够多的次数之后,如果还拿不到锁,才进入阻塞状态。为此,根据CPU的核数,定义了自旋次数的常量值。如果是单核的CPU,肯定不能自旋,在多核情况下,才采用自旋策略。

    /** Number of processors, for spin control */
    private static final int NCPU = Runtime.getRuntime().availableProcessors();

    /** Maximum number of retries before enqueuing on acquisition */
    private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;

下面以写锁的加锁,也就是StampedLock的writeLock()函数为例,来看一下自旋的实现。

    /**
     * Exclusively acquires the lock, blocking if necessary
     * until available.
     *
     * @return a stamp that can be used to unlock or convert mode
     */
    public long writeLock() {
        long s, next;  // bypass acquireWrite in fully unlocked case only
        return ((((s = state) & ABITS) == 0L &&
                 U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
                next : acquireWrite(false, 0L));
    }

如上面代码所示,当state&ABITS==0的时候,说明既没有线程持有读锁,也没有线程持有写锁,此时当前线程才有资格通过CAS操作state。若操作不成功,则调用acquireWrite()函数进入阻塞队列,并进行自旋,这个函数是整个加锁操作的核心,代码如下。

	/**
     * See above for explanation.
     *
     * @param interruptible true if should check interrupts and if so
     * return INTERRUPTED
     * @param deadline if nonzero, the System.nanoTime value to timeout
     * at (and return zero)
     * @return next state, or INTERRUPTED
     */
    private long acquireWrite(boolean interruptible, long deadline) {
        WNode node = null, p;
        // 入队列时自旋
        for (int spins = -1;;) { // spin while enqueuing
            long m, s, ns;
            if ((m = (s = state) & ABITS) == 0L) {
                if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                    // 自旋的时候拿到了锁,函数返回
                    return ns;
            }
            else if (spins < 0)
                spins = (m == WBIT && wtail == whead) ? SPINS : 0;
            else if (spins > 0) {
                if (LockSupport.nextSecondarySeed() >= 0)
                    // 不断自旋,以一定的概率将spins值往下累减
                    --spins;
            }
            else if ((p = wtail) == null) { // initialize queue (初始化队列)
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    wtail = hd;
            }
            else if (node == null)
                node = new WNode(WMODE, p);
            else if (node.prev != p)
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                p.next = node;
                // for循环唯一的break,CAS tail成功(成功则加入队列尾部),才会退出for循环
                break;
            }
        }

        for (int spins = -1;;) {
            WNode h, np, pp; int ps;
            if ((h = whead) == p) {
                if (spins < 0)
                    spins = HEAD_SPINS;
                else if (spins < MAX_HEAD_SPINS)
                    spins <<= 1;
                for (int k = spins;;) { // spin at head
                    long s, ns;
                    if (((s = state) & ABITS) == 0L) { // 再次尝试拿锁
                        if (U.compareAndSwapLong(this, STATE, s,
                                                 ns = s + WBIT)) {
                            whead = node;
                            node.prev = null;
                            return ns;
                        }
                    }
                    else if (LockSupport.nextSecondarySeed() >= 0 &&
                             --k <= 0) // 不断自旋
                        break;
                }
            }
            else if (h != null) { // help release stale waiters
                WNode c; Thread w;
                while ((c = h.cowait) != null) { // 自己从阻塞中唤醒,然后唤醒cowait中的
                                                 // 所有read线程
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null)
                        U.unpark(w);
                }
            }
            if (whead == h) {
                if ((np = node.prev) != p) {
                    if (np != null)
                        (p = np).next = node;   // stale
                }
                else if ((ps = p.status) == 0)
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                else if (ps == CANCELLED) {
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                }
                else {
                    long time; // 0 argument to park means no timeout
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, node, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                        whead == h && node.prev == p)
                        // 进行阻塞状态,之后被另外一个线程release唤醒,接着往下执行for循环
                        U.park(false, time);  // emulate LockSupport.park
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, node, true);
                }
            }
        }
    }

整个acquireWrite(..)函数是两个大的for循环,内部实现了非常复杂的自旋策略。在第一个大的for循环里面,目的就是把该Node加入队列的尾部,一边加入,一边通过CAS操作尝试获得锁。如果获得了,整个函数就会返回;如果不能获得锁,会一直自旋,直到加入队列尾部。

在第二个大的for循环里,也就是该Node已经在队列尾部了。这个时候,如果发现自己刚好也在队列头部,说明队列中除了空的Head节点,就是当前线程了。此时,再进行新一轮的自旋,直到达到MAX_HEAD_SPINS次数,然后进入阻塞。这里有一个关键点要说明:当release(..)函数被调用之后,会唤醒队列头部的第1个元素,此时会执行第二个大的for循环里面的逻辑,也就是接着for循环里面park()函数后面的代码往下执行。

另外一个不同于AQS的阻塞队列的地方是,在每个WNode里面有一个cowait指针,用于串联起所有的读线程。例如,队列尾部阻塞的是一个读线程1,现在又来了读线程2、3,那么会通过cowait指针,把1、2、3串联起来。1被唤醒之后,2、3也随之一起被唤醒,因为读和读之间不互斥。

明白加锁的自旋策略后,下面来看锁的释放操作。和读写锁的实现类似,也是做了两件事情:一是把state变量置回原位,二是唤醒阻塞队列中的第一个节点。

这个节点被唤醒之后,会继续执行上面的第二个大的for循环,自旋拿锁。如果成功拿到,则出队列;如果拿不到,这个节点则再次进入阻塞,等待下一次被唤醒。

	/**
     * If the lock state matches the given stamp, releases the
     * exclusive lock.
     *
     * @param stamp a stamp returned by a write-lock operation
     * @throws IllegalMonitorStateException if the stamp does
     * not match the current state of this lock
     */
    public void unlockWrite(long stamp) {
        WNode h;
        if (state != stamp || (stamp & WBIT) == 0L)
            throw new IllegalMonitorStateException();
        // 释放锁,把state复原
        state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
        // 唤醒队列头部的第一个节点
        if ((h = whead) != null && h.status != 0)
            release(h);
    }

	/**
     * Wakes up the successor of h (normally whead). This is normally
     * just h.next, but may require traversal from wtail if next
     * pointers are lagging. This may fail to wake up an acquiring
     * thread when one or more have been cancelled, but the cancel
     * methods themselves provide extra safeguards to ensure liveness.
     */
    private void release(WNode h) {
        if (h != null) {
            WNode q; Thread w;
            U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
            if ((q = h.next) == null || q.status == CANCELLED) {
                for (WNode t = wtail; t != null && t != h; t = t.prev)
                    if (t.status <= 0)
                        q = t;
            }
            if (q != null && (w = q.thread) != null)
                U.unpark(w);
        }
    }