第2章 Atomic类

整个Concurrent包的层次体系:

整个Concurrent包的层次体系

AtomicInteger和AtomicLong

对于一个整数的加减操作,要保证线程安全,需要加锁,也就是加synchronized关键字。

有了Concurrent包的Atomic相关的类之后,synchronized关键字可以用AtomicInteger代替,其性能更好。

其对应的源码如下:

public class AtomicInteger extends Number implements java.io.Serializable {
    
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;
    
    public final int getAndIncrement() {
        return U.getAndAddInt(this, VALUE, 1);
    }
    
    public final int getAndDecrement() {
        return U.getAndAddInt(this, VALUE, -1);
    }
}

public final class Unsafe {
    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }
}

其源码很简单,但却反映了几个很重要的思想,下面一一说明。

悲观锁与乐观锁

对于悲观锁,认为数据发生并发冲突的概率很大,所以读操作之前就上锁。synchronized关键字,以及后面要讲的ReentrantLock都是悲观锁的典型例子。

对于乐观锁,认为数据发生并发冲突的概率比较小,所以读操作之前不上锁。等到写操作的时候,再判断数据在此期间是否被其他线程修改了。如果被其他线程修改了,就把数据重新读出来,重复该过程;如果没有被修改,就写回去。判断数据是否被修改,同时写回新值,这两个操作要合成一个原子操作,也就是CAS(Compare And Set)。

AtomicInteger的实现就是典型的乐观锁,在MySQL和Redis中有类似的思路。

Unsafe的CAS详解

上面调用的CAS函数,其实是封装的Unsafe类中的一个native函数。

AtomicInteger封装过的compareAndSet有两个参数。第一个参数expect是指变量的旧值(是读出来的值,写回去的时候,希望没有被其他线程修改,所以称为expect);第二个参数update是指变量的新值(修改过的,希望写入的值)。当expect等于变量当前的值时,说明在修改的期间,没有其他线程对此变量进行过修改,所以可以成功写入,变量被更新为update,返回true;否则返回false。

Unsafe类是整个Concurrent包的基础,里面所有函数都是native的。具体到compareAndSwapInt函数,如下所示:

	public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

该函数有4个参数。在前两个参数中,第一个是对象(也就是AtomicInteger 对象),第二个是对象的成员变量(也就是AtomictInteger里面包的int变量value),后两个参数保持不变。

要特别说明一下第二个参数,它是一个long型的整数,经常被称为xxxOffset,意思是某个成员变量在对应的类中的内存偏移量(该变量在内存中的位置),表示该成员变量本身。在Unsafe中专门有一个函数,把成员变量转化成偏移量,如下所示:

	public native long objectFieldOffset(Field var1);

所有调用CAS的地方,都会先通过这个函数把成员变量转换成一个Offset。

	private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

从上面代码可以看到,无论是Unsafe还是valueOffset,都是静态的,也就是类级别的,所有对象共用的。

在转化的时候,先通过反射(getDeclaredField)获取value成员变量对应的Field对象,再通过objectFieldOffset函数转化成valueOffset。此处的valueOffset就代表了value变量本身,后面执行CAS操作的时候,不是直接操作value,而是操作valueOffset。

自旋与阻塞

当一个线程拿不到锁的时候,有以下两种基本的等待策略:

  • 策略1:放弃CPU,进入阻塞状态,等待后续被唤醒,再重新被操作系统调度。
  • 策略2:不放弃CPU,空转,不断重试,也就是所谓的“自旋”。

很显然,如果是单核的CPU,只能用策略1。因为如果不放弃CPU,那么其他线程无法运行,也就无法释放锁。但对于多CPU或者多核,策略2就很有用了,因为没有线程切换的开销。

AtomicInteger的实现就用的是“自旋”策略,如果拿不到锁,就会一直重试。有一点要说明:这两种策略并不是互斥的,可以结合使用。如果拿不到锁,先自旋几圈;如果自旋还拿不到锁,再阻塞,synchronized关键字就是这样的实现策略。除了AtomicInteger,AtomicLong也是同样的原理。

AtomicBoolean和AtomicReference

为什么需要AtomicBoolean

对于int或者long型变量,需要进行加减操作,所以要加锁;但对于一个boolean类型来说,true或false的赋值和取值操作,加上volatile关键字就够了,为什么还需要AtomicBoolean呢?

这是因为往往要实现下面这种功能:

	if (flag == false) {
        flag = true;
        ...
    }

也就是要实现compare和set两个操作合在一起的原子性,而这也正是CAS提供的功能。上面的代码,就变成:

 if (compareAndSet(false, true)) {
     ...
 }

同样地,AtomicReference也需要同样的功能,对应的函数如下:

	public final boolean compareAndSet(V expect, V update) {
        return Unsafe.compareAndSwapObjevt(this, valueOffset, expect, update);
    }

其中,expect是旧的引用,update为新的引用。

如何支持boolean和double类型

在Unsafe类中,只提供了三种类型的CAS操作:int、long、Object(也就是引用类型)。如下所示:

	public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

    public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

第一个参数是要修改的对象,第二个参数是对象的成员变量在内存中的位置(一个long型的整数),第三个参数是该变量的旧值,第四个参数是该变量的新值。

AtomicBoolean类型怎么支持呢?

对于用int型来代替的,在入参的时候,将boolean类型转换成int类型;在返回值的时候,将int类型转换成boolean类型。如下所示。

如果是double类型,又如何支持呢?这依赖double类型提供的一对double类型和long类型互转的函数。

public final class Double extends Number implements Comparable<Double> {
    public static native long doubleToRawLongBits(double value);
    public static native double longBitsToDouble(long bits);
}

AtomicStampedReference和AtomicMarkableReference

ABA问题与解决办法

到目前为止,CAS都是基于“值”来做比较的。但如果另外一个线程把变量的值从A改为B,再从B改回到A,那么尽管修改过两次,可是在当前线程做CAS操作的时候,却会因为值没变而认为数据没有被其他线程修改过,这就是所谓的ABA问题。

要解决ABA 问题,不仅要比较“值”,还要比较“版本号”,而这正是AtomicStamped-Reference做的事情,其对应的CAS函数如下:

public class AtomicStampedReference<V> {
	public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }
}

之前的CAS只有两个参数,这里的CAS有四个参数,后两个参数就是版本号的旧值和新值。

当expectedReference!=对象当前的reference时,说明该数据肯定被其他线程修改过;

当expectedReference==对象当前的reference时,再进一步比较expectedStamp是否等于对象当前的版本号,以此判断数据是否被其他线程修改过。

为什么没有AtomicStampedInteger或AtomictStampedLong

要解决Integer或者Long型变量的ABA问题,为什么只有AtomicStampedReference,而没有AtomicStampedInteger或者AtomictStampedLong呢?

因为这里要同时比较数据的“值”和“版本号”,而Integer型或者Long型的CAS没有办法同时比较两个变量,于是只能把值和版本号封装成一个对象,也就是这里面的Pair 内部类,然后通过对象引用的CAS来实现。

当使用的时候,在构造函数里面传入值和版本号两个参数,应用程序对版本号进行累加操作,然后调用CAS。

AtomicMarkableReference

AtomicMarkableReference与AtomicStampedReference原理类似,只是Pair里面的版本号是boolean类型的,而不是整型的累加变量。

因为是boolean类型,只能有true、false 两个版本号,所以并不能完全避免ABA问题,只是降低了ABA发生的概率。

AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater

为什么需要AtomicXXXFieldUpdater

如果一个类是自己编写的,则可以在编写的时候把成员变量定义为Atomic类型。但如果是一个已经有的类,在不能更改其源代码的情况下,要想实现对其成员变量的原子操作,就需要AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和AtomicReferenceFieldUpdater。

首先,其构造函数是protected,不能直接构造其对象,必须通过它提供的一个静态函数来创建,如下所示:

	public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
                                                              String fieldName) {
        return new AtomicIntegerFieldUpdaterImpl<U>
            (tclass, fieldName, Reflection.getCallerClass());
    }

newUpdater(..)静态函数传入的是要修改的类(不是对象)和对应的成员变量的名字,内部通过反射拿到这个类的成员变量,然后包装成一个AtomicIntegerFieldUpdater对象。所以,这个对象表示的是类的某个成员,而不是对象的成员变量。

若要修改某个对象的成员变量的值,再传入相应的对象,如下所示:

	public int getAndIncrement(T obj) {
        int prev, next;
        do {
            prev = get(obj);
            next = prev + 1;
        } while (!compareAndSet(obj, prev, next));
        return prev;
    }

	public final boolean compareAndSet(T obj, int expect, int update) {
        accessCheck(obj);
        return U.compareAndSwapInt(obj, offset, expect, update);
    }

accecssCheck函数的作用是检查该obj是不是tclass类型,如果不是,则拒绝修改,抛出异常。

从代码可以看到,其CAS原理和AtomictInteger是一样的,底层都调用了Unsafe的compareAndSwapInt(..)函数。

限制条件

要想使用AtomicIntegerFieldUpdater修改成员变量,成员变量必须是volatile的int类型(不能是Integer包装类),该限制从其构造函数中可以看到:

		AtomicIntegerFieldUpdaterImpl(final Class<T> tclass,
                                      final String fieldName,
                                      final Class<?> caller) {
            final Field field;
            final int modifiers;
            try {
                field = AccessController.doPrivileged(
                    new PrivilegedExceptionAction<Field>() {
                        public Field run() throws NoSuchFieldException {
                            return tclass.getDeclaredField(fieldName);
                        }
                    });
                modifiers = field.getModifiers();
                sun.reflect.misc.ReflectUtil.ensureMemberAccess(
                    caller, tclass, null, modifiers);
                ClassLoader cl = tclass.getClassLoader();
                ClassLoader ccl = caller.getClassLoader();
                if ((ccl != null) && (ccl != cl) &&
                    ((cl == null) || !isAncestor(cl, ccl))) {
                    sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
                }
            } catch (PrivilegedActionException pae) {
                throw new RuntimeException(pae.getException());
            } catch (Exception ex) {
                throw new RuntimeException(ex);
            }

            if (field.getType() != int.class)
                throw new IllegalArgumentException("Must be integer type");

            if (!Modifier.isVolatile(modifiers))
                throw new IllegalArgumentException("Must be volatile type");

            // Access to protected field members is restricted to receivers only
            // of the accessing class, or one of its subclasses, and the
            // accessing class must in turn be a subclass (or package sibling)
            // of the protected member's defining class.
            // If the updater refers to a protected field of a declaring class
            // outside the current package, the receiver argument will be
            // narrowed to the type of the accessing class.
            this.cclass = (Modifier.isProtected(modifiers) &&
                           tclass.isAssignableFrom(caller) &&
                           !isSamePackage(tclass, caller))
                          ? caller : tclass;
            this.tclass = tclass;
            this.offset = U.objectFieldOffset(field);
        }

至于AtomicLongFieldUpdater、AtomicReferenceFieldUpdater,也有类似的限制条件。其底层的CAS原理,也和AtomicLong、AtomicReference一样。

AtomicIntegerArray、AtomicLongArray和Atomic-ReferenceArray

Concurrent包提供了AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray三个数组元素的原子操作。注意,这里并不是说对整个数组的操作是原子的,而是针对数组中一个元素的原子操作而言。

使用方式

以AtomicIntegerArray为例,其使用方式如下:

	/**
     * Atomically increments by one the element at index {@code i}.
     *
     * @param i the index
     * @return the previous value
     */
    public final int getAndIncrement(int i) {
        return getAndAdd(i, 1);
    }

相比于AtomicInteger的getAndIncrement()函数,这里只是多了一个传入参数:数组的下标i。其他函数也与此类似,相比于AtomicInteger 的各种加减函数,也都是多一个下标i。

实现原理

其底层的CAS函数用的还是compareAndSwapInt,但是把数组下标i转化成对应的内存偏移量,所用的方法和之前的AtomicInteger不太一样,如下所示:

	private static long byteOffset(int i) {
        return ((long) i << shift) + base;
    }

下标i 转换成对应的内存地址,用到了shift和base两个变量。这两个变量都是AtomicIntegerArray的静态成员变量,用了Unsafe类的arrayBaseOffset和arrayIndexScale两个函数来获取。赋值如下:

	private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final int base = unsafe.arrayBaseOffset(int[].class);
    private static final int shift;
    private final int[] array;

    static {
        int scale = unsafe.arrayIndexScale(int[].class);
        if ((scale & (scale - 1)) != 0)
            throw new Error("data type scale not a power of two");
        shift = 31 - Integer.numberOfLeadingZeros(scale);
    }

其中,base表示数组的首地址的位置,scale表示一个数组元素的大小,i的偏移量则等于:i*scale+base。

但为了优化性能,使用了位移操作,shift表示scale中1的位置(scale是2的整数次方)。所以,偏移量的计算变成上面代码中的:i << shift+base,表达的意思就是:i*scale+base

知道了偏移量的计算方式,理解CAS操作就容易了:

 	/**
     * Atomically adds the given value to the element at index {@code i}.
     *
     * @param i the index
     * @param delta the value to add
     * @return the previous value
     */
    public final int getAndAdd(int i, int delta) {
        return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
    }

	private long checkedByteOffset(int i) {
        if (i < 0 || i >= array.length)
            throw new IndexOutOfBoundsException("index " + i);

        return byteOffset(i);
    }

另外两个数组的原子类实现原理与之类似。

Striped64与LongAdder

从JDK 8开始,针对Long型的原子操作,Java又提供了LongAdder、LongAccumulator;针对Double类型,Java提供了DoubleAdder、DoubleAccumulator。Striped64相关的类的继承层次如下图所示:

Striped64相关的类的继承层次

既然已有了AtomicLong,为什么还要提供LongAdder?并且还提供了LongAccumulator?

LongAdder原理

AtomicLong内部是一个volatile long型变量,由多个线程对这个变量进行CAS操作。多个线程同时对一个变量进行CAS操作,在高并发的场景下仍不够快,如果再要提高性能,该怎么做呢?

把一个变量拆成多份,变为多个变量,有些类似于ConcurrentHashMap 的分段锁的例子。如图所示,把一个Long型拆成一个base变量外加多个Cell,每个Cell包装了一个Long型变量。当多个线程并发累加的时候,如果并发度低,就直接加到base变量上;如果并发度高,冲突大,平摊到这些Cell上。在最后取值的时候,再把base和这些Cell求sum运算。

1个Long型变量被拆成多个Long型的示意图

以LongAdder的sum()函数为例,如下所示:

public class LongAdder extends Striped64 implements Serializable {
	public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
}

由于无论是long,还是double,都是64位的。但因为没有double型的CAS操作,所以是通过把double型转化成long型来实现的。所以,上面的base和cell[]变量,是位于基类Striped64当中的。英文Striped意为“条带”,也就是分片。

abstract class Striped64 extends Number {
	/**
     * Table of cells. When non-null, size is a power of 2.
     */
    transient volatile Cell[] cells;

    /**
     * Base value, used mainly when there is no contention, but also as
     * a fallback during table initialization races. Updated via CAS.
     */
    transient volatile long base;
    
    @sun.misc.Contended static final class Cell {
        volatile long value;
        
        ...
    }
    
    ...
}

最终一致性

在sum求和函数中,并没有对cells[]数组加锁。也就是说,一边有线程对其执行求和操作,一边还有线程修改数组里的值,也就是最终一致性,而不是强一致性。这也类似于ConcurrentHashMap 中的clear()函数,一边执行清空操作,一边还有线程放入数据,clear()函数调用完毕后再读取,hash map里面可能还有元素。因此,在LongAdder开篇的注释中,把它和AtomicLong 的使用场景做了比较。它适合高并发的统计场景,而不适合要对某个Long 型变量进行严格同步的场景。

伪共享与缓存行填充

在Cell类的定义中,用了一个独特的注解@sun.misc.Contended,这是JDK 8之后才有的,背后涉及一个很重要的优化原理:伪共享与缓存行填充。

在讲CPU 架构的时候提到过,每个CPU 都有自己的缓存。缓存与主内存进行数据交换的基本单位叫Cache Line(缓存行)。在64位x86架构中,缓存行是64字节,也就是8个Long型的大小。这也意味着当缓存失效,要刷新到主内存的时候,最少要刷新64字节。

如图所示,主内存中有变量X、Y、Z(假设每个变量都是一个Long型),被CPU1和CPU2分别读入自己的缓存,放在了同一行Cache Line里面。当CPU1修改了X变量,它要失效整行Cache Line,也就是往总线上发消息,通知CPU 2对应的Cache Line失效。由于Cache Line是数据交换的基本单位,无法只失效X,要失效就会失效整行的Cache Line,这会导致Y、Z变量的缓存也失效。

伪共享示意图

虽然只修改了X变量,本应该只失效X变量的缓存,但Y、Z变量也随之失效。Y、Z变量的数据没有修改,本应该很好地被CPU1和CPU2共享,却没做到,这就是所谓的“伪共享问题”。

问题的原因是,Y、Z和X变量处在了同一行Cache Line里面。要解决这个问题,需要用到所谓的“缓存行填充”,分别在X、Y、Z后面加上7个无用的Long型,填充整个缓存行,让X、Y、Z处在三行不同的缓存行中,如图所示:

缓存行填充示意图

JDK 7的Exchanger类,为了安全,它填充了15(8+7)个long型。

在著名的开源无锁并发框架Disruptor中,也有类似的代码。

image-20220310102059108

而在JDK 8中,就不需要写这种晦涩的代码了,只需声明一个@sun.misc.Contended即可。

之所以这个地方要用缓存行填充,是为了不让Cell[]数组中相邻的元素落到同一个缓存行里。

LongAdder核心实现

下面来看LongAdder最核心的累加函数add(long x),自增、自减操作都是通过调用该函数实现的。

	/**
     * Equivalent to {@code add(1)}.
     */
    public void increment() {
        add(1L);
    }

    /**
     * Equivalent to {@code add(-1)}.
     */
    public void decrement() {
        add(-1L);
    }

	public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }

当一个线程调用add(x)的时候,首先会尝试使用casBase把x加到base变量上。如果不成功,则再用a.cas(..)函数尝试把x加到Cell数组的某个元素上。如果还不成功,最后再调用longAccumulate(..)函数。

注意:Cell[]数组的大小始终是2的整数次方,在运行中会不断扩容,每次扩容都是增长2倍。上面代码中的as[getProbe()&m]其实就是对数组的大小取模。因为m=as.length–1,getProbe()为该线程生成一个随机数,用该随机数对数组的长度取模。因为数组长度是2的整数次方,所以可以用&操作来优化取模运算。

对于一个线程来说,它并不在意到底是把x累加到base上面,还是累加到Cell[]数组上面,只要累加成功就可以。因此,这里使用随机数来实现Cell的长度取模。

如果两次尝试都不成功,则调用longAccumulate(..)函数,该函数在Striped64里面LongAccumulator也会用到,如下所示。

	/**
     * Handles cases of updates involving initialization, resizing,
     * creating new Cells, and/or contention. See above for
     * explanation. This method suffers the usual non-modularity
     * problems of optimistic retry code, relying on rechecked sets of
     * reads.
     *
     * @param x the value
     * @param fn the update function, or null for add (this convention
     * avoids the need for an extra field or function in LongAdder).
     * @param wasUncontended false if CAS failed before call
     */
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }

上面的函数fn就是LongAccumulator要用到的,但对于LongAdder而言,fn=null,就是简单的累加操作v+x。

上面的for循环被分成三个大的分支。在第二个分支里面,进行了Cells[]数组的初始化工作,初始大小为2,然后把x累加在0下标或者1下标对应的Cell上面。

					if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }						

在第一个大的分支里面,完成Cells[]数组的不断扩容,每次扩容都是增长2倍。

						if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }

数组为空,并且有一个线程正在进行初始化工作,于是进入第三个大的分支中,尝试对base变量进行累积,如果再次失败,则会再次进入第一个大的分支。

			else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base

LongAccumulator

LongAccumulator的原理和LongAdder类似,只是功能更强大,下面为两者构造函数的对比:

	/**
     * Creates a new adder with initial sum of zero.
     */
    public LongAdder() {
    }
	/**
     * Creates a new instance using the given accumulator function
     * and identity element.
     * @param accumulatorFunction a side-effect-free function of two arguments
     * @param identity identity (initial value) for the accumulator function
     */
    public LongAccumulator(LongBinaryOperator accumulatorFunction,
                           long identity) {
        this.function = accumulatorFunction;
        base = this.identity = identity;
    }

LongAdder只能进行累加操作,并且初始值默认为0;LongAccumulator可以自己定义一个二元操作符,并且可以传入一个初始值。

public interface LongBinaryOperator {

    /**
     * Applies this operator to the given operands.
     *
     * @param left the first operand
     * @param right the second operand
     * @return the operator result
     */
    long applyAsLong(long left, long right);
}

操作符的左值,就是base变量或者Cells[]中元素的当前值;右值,就是add()函数传入的参数x。

下面是LongAccumulator的accumulate(x)函数,与LongAdder的add(x)函数类似,最后都是调用的Striped64的LongAccumulate(..)函数。唯一的差别就是LongAdder的add(x)函数调用的是casBase(b,b+x),这里调用的是casBase(b,r),其中,r=function.applyAsLong(b=base,x)。

	/**
     * Updates with the given value.
     *
     * @param x the value
     */
    public void accumulate(long x) {
        Cell[] as; long b, v, r; int m; Cell a;
        if ((as = cells) != null ||
            (r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended =
                  (r = function.applyAsLong(v = a.value, x)) == v ||
                  a.cas(v, r)))
                longAccumulate(x, function, uncontended);
        }
    }

DoubleAdder与DoubleAccumulator

DoubleAdder 其实也是用long 型实现的,因为没有double类型的CAS 函数。下面是DoubleAdder的add(x)函数,和LongAdder的add(x)函数基本一样,只是多了long和double类型的相互转换。

	/**
     * Adds the given value.
     *
     * @param x the value to add
     */
    public void add(double x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null ||
            !casBase(b = base,
                     Double.doubleToRawLongBits
                     (Double.longBitsToDouble(b) + x))) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value,
                                      Double.doubleToRawLongBits
                                      (Double.longBitsToDouble(v) + x))))
                doubleAccumulate(x, null, uncontended);
        }
    }

其中的关键Double.doubleToRawLongBits(Double.longBitsToDouble(b)+x),在读出来的时候,它把long 类型转换成double 类型,然后进行累加,累加的结果再转换成long类型,通过CAS写回去。

DoubleAccumulate也是Striped64的成员函数,和longAccumulate类似,也是多了long类型和double类型的互相转换。

DoubleAccumulator和DoubleAdder的关系,与LongAccumulator和LongAdder的关系类似,只是多了一个二元操作符。