LongAdder和Sync和Atomic

LongAdder内部使用了分段锁,将单一的CAS操作分散为对数组Cells中多个元祖的CAS

在高并发(千、万级并发以上)LongAdder的效率大于Atomic大于synchronized

Atomic

Atomic 原子类介绍

Atomic 翻译成中文是原子的意思。在化学上,我们知道原子是构成一般物质的最小单位,在化学反应中是不可分割的。在我们这里 Atomic 是指一个操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。

所以,所谓原子类说简单点就是具有原子/原子操作特征的类。

并发包 java.util.concurrent 的原子类都存放在java.util.concurrent.atomic下,如下图所示。

JUC原子类概览

根据操作的数据类型,可以将 JUC 包中的原子类分为 4 类

基本类型

使用原子的方式更新基本类型

  • AtomicInteger:整型原子类
  • AtomicLong:长整型原子类
  • AtomicBoolean :布尔型原子类

数组类型

使用原子的方式更新数组里的某个元素

  • AtomicIntegerArray:整型数组原子类
  • AtomicLongArray:长整型数组原子类
  • AtomicReferenceArray :引用类型数组原子类

引用类型

  • AtomicReference:引用类型原子类
  • AtomicMarkableReference:原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起来,也可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
  • AtomicStampedReference :原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。

对象的属性修改类型

  • AtomicIntegerFieldUpdater:原子更新整型字段的更新器
  • AtomicLongFieldUpdater:原子更新长整型字段的更新器
  • AtomicReferenceFieldUpdater:原子更新引用类型里的字段

🐛 修正(参见:issue#626 : AtomicMarkableReference 不能解决 ABA 问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
    /**

AtomicMarkableReference是将一个boolean值作是否有更改的标记,本质就是它的版本号只有两个,true和false,

修改的时候在这两个版本号之间来回切换,这样做并不能解决ABA的问题,只是会降低ABA问题发生的几率而已

@author : mazh

@Date : 2020/1/17 14:41
*/

public class SolveABAByAtomicMarkableReference {

private static AtomicMarkableReference atomicMarkableReference = new AtomicMarkableReference(100, false);

public static void main(String[] args) {

Thread refT1 = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicMarkableReference.compareAndSet(100, 101, atomicMarkableReference.isMarked(), !atomicMarkableReference.isMarked());
atomicMarkableReference.compareAndSet(101, 100, atomicMarkableReference.isMarked(), !atomicMarkableReference.isMarked());
});

Thread refT2 = new Thread(() -> {
boolean marked = atomicMarkableReference.isMarked();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean c3 = atomicMarkableReference.compareAndSet(100, 101, marked, !marked);
System.out.println(c3); // 返回true,实际应该返回false
});

refT1.start();
refT2.start();
}
}

CAS ABA 问题

  • 描述: 第一个线程取到了变量 x 的值 A,然后巴拉巴拉干别的事,总之就是只拿到了变量 x 的值 A。这段时间内第二个线程也取到了变量 x 的值 A,然后把变量 x 的值改为 B,然后巴拉巴拉干别的事,最后又把变量 x 的值变为 A (相当于还原了)。在这之后第一个线程终于进行了变量 x 的操作,但是此时变量 x 的值还是 A,所以 compareAndSet 操作是成功。
  • 例子描述(可能不太合适,但好理解): 年初,现金为零,然后通过正常劳动赚了三百万,之后正常消费了(比如买房子)三百万。年末,虽然现金零收入(可能变成其他形式了),但是赚了钱是事实,还是得交税的!
  • 代码例子(以AtomicInteger为例)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerDefectDemo {
public static void main(String[] args) {
defectOfABA();
}

static void defectOfABA() {
final AtomicInteger atomicInteger = new AtomicInteger(1);

Thread coreThread = new Thread(
() -> {
final int currentValue = atomicInteger.get();
System.out.println(Thread.currentThread().getName() + " ------ currentValue=" + currentValue);

// 这段目的:模拟处理其他业务花费的时间
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}

boolean casResult = atomicInteger.compareAndSet(1, 2);
System.out.println(Thread.currentThread().getName()
+ " ------ currentValue=" + currentValue
+ ", finalValue=" + atomicInteger.get()
+ ", compareAndSet Result=" + casResult);
}
);
coreThread.start();

// 这段目的:为了让 coreThread 线程先跑起来
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}

Thread amateurThread = new Thread(
() -> {
int currentValue = atomicInteger.get();
boolean casResult = atomicInteger.compareAndSet(1, 2);
System.out.println(Thread.currentThread().getName()
+ " ------ currentValue=" + currentValue
+ ", finalValue=" + atomicInteger.get()
+ ", compareAndSet Result=" + casResult);

currentValue = atomicInteger.get();
casResult = atomicInteger.compareAndSet(2, 1);
System.out.println(Thread.currentThread().getName()
+ " ------ currentValue=" + currentValue
+ ", finalValue=" + atomicInteger.get()
+ ", compareAndSet Result=" + casResult);
}
);
amateurThread.start();
}
}

输出内容如下:

1
2
3
4
Thread-0 ------ currentValue=1
Thread-1 ------ currentValue=1, finalValue=2, compareAndSet Result=true
Thread-1 ------ currentValue=2, finalValue=1, compareAndSet Result=true
Thread-0 ------ currentValue=1, finalValue=2, compareAndSet Result=true

下面我们来详细介绍一下这些原子类。

基本类型原子类

基本类型原子类介绍

使用原子的方式更新基本类型

  • AtomicInteger:整型原子类
  • AtomicLong:长整型原子类
  • AtomicBoolean :布尔型原子类

上面三个类提供的方法几乎相同,所以我们这里以 AtomicInteger 为例子来介绍。

AtomicInteger 类常用方法

1
2
3
4
5
6
7
public final int get() //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
public final void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

AtomicInteger 常见方法使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerTest {

public static void main(String[] args) {
// TODO Auto-generated method stub
int temvalue = 0;
AtomicInteger i = new AtomicInteger(0);
temvalue = i.getAndSet(3);
System.out.println("temvalue:" + temvalue + "; i:" + i);//temvalue:0; i:3
temvalue = i.getAndIncrement();
System.out.println("temvalue:" + temvalue + "; i:" + i);//temvalue:3; i:4
temvalue = i.getAndAdd(5);
System.out.println("temvalue:" + temvalue + "; i:" + i);//temvalue:4; i:9
}

}

基本数据类型原子类的优势

通过一个简单例子带大家看一下基本数据类型原子类的优势

① 多线程环境不使用原子类保证线程安全(基本数据类型)

1
2
3
4
5
6
7
8
9
10
11
class Test {
private volatile int count = 0;
//若要线程安全执行执行count++,需要加锁
public synchronized void increment() {
count++;
}

public int getCount() {
return count;
}
}

② 多线程环境使用原子类保证线程安全(基本数据类型)

1
2
3
4
5
6
7
8
9
10
11
12
class Test2 {
private AtomicInteger count = new AtomicInteger();

public void increment() {
count.incrementAndGet();
}
//使用AtomicInteger之后,不需要加锁,也可以实现线程安全。
public int getCount() {
return count.get();
}
}

AtomicInteger 线程安全原理简单分析

AtomicInteger 类的部分源码:

1
2
3
4
5
6
7
8
9
10
11
12
// setup to use Unsafe.compareAndSwapInt for updates(更新操作时提供“比较并替换”的作用)
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;

AtomicInteger 类主要利用 CAS (compare and swap) + volatile 和 native 方法来保证原子操作,从而避免 synchronized 的高开销,执行效率大为提升。

CAS 的原理是拿期望的值和原本的一个值作比较,如果相同则更新成新的值。UnSafe 类的 objectFieldOffset() 方法是一个本地方法,这个方法是用来拿到“原来的值”的内存地址。另外 value 是一个 volatile 变量,在内存中可见,因此 JVM 可以保证任何时刻任何线程总能拿到该变量的最新值。

数组类型原子类

数组类型原子类介绍

使用原子的方式更新数组里的某个元素

  • AtomicIntegerArray:整形数组原子类
  • AtomicLongArray:长整形数组原子类
  • AtomicReferenceArray :引用类型数组原子类

上面三个类提供的方法几乎相同,所以我们这里以 AtomicIntegerArray 为例子来介绍。

AtomicIntegerArray 类常用方法

1
2
3
4
5
6
7
public final int get(int i) //获取 index=i 位置元素的值
public final int getAndSet(int i, int newValue)//返回 index=i 位置的当前的值,并将其设置为新值:newValue
public final int getAndIncrement(int i)//获取 index=i 位置元素的值,并让该位置的元素自增
public final int getAndDecrement(int i) //获取 index=i 位置元素的值,并让该位置的元素自减
public final int getAndAdd(int i, int delta) //获取 index=i 位置元素的值,并加上预期的值
boolean compareAndSet(int i, int expect, int update) //如果输入的数值等于预期值,则以原子方式将 index=i 位置的元素值设置为输入值(update)
public final void lazySet(int i, int newValue)//最终 将index=i 位置的元素设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

AtomicIntegerArray 常见方法使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import java.util.concurrent.atomic.AtomicIntegerArray;

public class AtomicIntegerArrayTest {

public static void main(String[] args) {
// TODO Auto-generated method stub
int temvalue = 0;
int[] nums = { 1, 2, 3, 4, 5, 6 };
AtomicIntegerArray i = new AtomicIntegerArray(nums);
for (int j = 0; j < nums.length; j++) {
System.out.println(i.get(j));
}
temvalue = i.getAndSet(0, 2);
System.out.println("temvalue:" + temvalue + "; i:" + i);
temvalue = i.getAndIncrement(0);
System.out.println("temvalue:" + temvalue + "; i:" + i);
temvalue = i.getAndAdd(0, 5);
System.out.println("temvalue:" + temvalue + "; i:" + i);
}

}

引用类型原子类

引用类型原子类介绍

基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用 引用类型原子类。

  • AtomicReference:引用类型原子类
  • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
  • AtomicMarkableReference :原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起来,也可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。

上面三个类提供的方法几乎相同,所以我们这里以 AtomicReference 为例子来介绍。

AtomicReference 类使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceTest {

public static void main(String[] args) {
AtomicReference<Person> ar = new AtomicReference<Person>();
Person person = new Person("SnailClimb", 22);
ar.set(person);
Person updatePerson = new Person("Daisy", 20);
ar.compareAndSet(person, updatePerson);

System.out.println(ar.get().getName());
System.out.println(ar.get().getAge());
}
}

class Person {
private String name;
private int age;

public Person(String name, int age) {
super();
this.name = name;
this.age = age;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

}

上述代码首先创建了一个 Person 对象,然后把 Person 对象设置进 AtomicReference 对象中,然后调用 compareAndSet 方法,该方法就是通过 CAS 操作设置 ar。如果 ar 的值为 person 的话,则将其设置为 updatePerson。实现原理与 AtomicInteger 类中的 compareAndSet 方法相同。运行上面的代码后的输出结果如下:

1
2
Daisy
20

AtomicStampedReference 类使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.concurrent.atomic.AtomicStampedReference;

public class AtomicStampedReferenceDemo {
public static void main(String[] args) {
// 实例化、取当前值和 stamp 值
final Integer initialRef = 0, initialStamp = 0;
final AtomicStampedReference<Integer> asr = new AtomicStampedReference<>(initialRef, initialStamp);
System.out.println("currentValue=" + asr.getReference() + ", currentStamp=" + asr.getStamp());

// compare and set
final Integer newReference = 666, newStamp = 999;
final boolean casResult = asr.compareAndSet(initialRef, newReference, initialStamp, newStamp);
System.out.println("currentValue=" + asr.getReference()
+ ", currentStamp=" + asr.getStamp()
+ ", casResult=" + casResult);

// 获取当前的值和当前的 stamp 值
int[] arr = new int[1];
final Integer currentValue = asr.get(arr);
final int currentStamp = arr[0];
System.out.println("currentValue=" + currentValue + ", currentStamp=" + currentStamp);

// 单独设置 stamp 值
final boolean attemptStampResult = asr.attemptStamp(newReference, 88);
System.out.println("currentValue=" + asr.getReference()
+ ", currentStamp=" + asr.getStamp()
+ ", attemptStampResult=" + attemptStampResult);

// 重新设置当前值和 stamp 值
asr.set(initialRef, initialStamp);
System.out.println("currentValue=" + asr.getReference() + ", currentStamp=" + asr.getStamp());

// [不推荐使用,除非搞清楚注释的意思了] weak compare and set
// 困惑!weakCompareAndSet 这个方法最终还是调用 compareAndSet 方法。[版本: jdk-8u191]
// 但是注释上写着 "May fail spuriously and does not provide ordering guarantees,
// so is only rarely an appropriate alternative to compareAndSet."
// todo 感觉有可能是 jvm 通过方法名在 native 方法里面做了转发
final boolean wCasResult = asr.weakCompareAndSet(initialRef, newReference, initialStamp, newStamp);
System.out.println("currentValue=" + asr.getReference()
+ ", currentStamp=" + asr.getStamp()
+ ", wCasResult=" + wCasResult);
}
}

输出结果如下:

1
2
3
4
5
6
currentValue=0, currentStamp=0
currentValue=666, currentStamp=999, casResult=true
currentValue=666, currentStamp=999
currentValue=666, currentStamp=88, attemptStampResult=true
currentValue=0, currentStamp=0
currentValue=666, currentStamp=999, wCasResult=true

AtomicMarkableReference 类使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.concurrent.atomic.AtomicMarkableReference;

public class AtomicMarkableReferenceDemo {
public static void main(String[] args) {
// 实例化、取当前值和 mark 值
final Boolean initialRef = null, initialMark = false;
final AtomicMarkableReference<Boolean> amr = new AtomicMarkableReference<>(initialRef, initialMark);
System.out.println("currentValue=" + amr.getReference() + ", currentMark=" + amr.isMarked());

// compare and set
final Boolean newReference1 = true, newMark1 = true;
final boolean casResult = amr.compareAndSet(initialRef, newReference1, initialMark, newMark1);
System.out.println("currentValue=" + amr.getReference()
+ ", currentMark=" + amr.isMarked()
+ ", casResult=" + casResult);

// 获取当前的值和当前的 mark 值
boolean[] arr = new boolean[1];
final Boolean currentValue = amr.get(arr);
final boolean currentMark = arr[0];
System.out.println("currentValue=" + currentValue + ", currentMark=" + currentMark);

// 单独设置 mark 值
final boolean attemptMarkResult = amr.attemptMark(newReference1, false);
System.out.println("currentValue=" + amr.getReference()
+ ", currentMark=" + amr.isMarked()
+ ", attemptMarkResult=" + attemptMarkResult);

// 重新设置当前值和 mark 值
amr.set(initialRef, initialMark);
System.out.println("currentValue=" + amr.getReference() + ", currentMark=" + amr.isMarked());

// [不推荐使用,除非搞清楚注释的意思了] weak compare and set
// 困惑!weakCompareAndSet 这个方法最终还是调用 compareAndSet 方法。[版本: jdk-8u191]
// 但是注释上写着 "May fail spuriously and does not provide ordering guarantees,
// so is only rarely an appropriate alternative to compareAndSet."
// todo 感觉有可能是 jvm 通过方法名在 native 方法里面做了转发
final boolean wCasResult = amr.weakCompareAndSet(initialRef, newReference1, initialMark, newMark1);
System.out.println("currentValue=" + amr.getReference()
+ ", currentMark=" + amr.isMarked()
+ ", wCasResult=" + wCasResult);
}
}

输出结果如下:

1
2
3
4
5
6
currentValue=null, currentMark=false
currentValue=true, currentMark=true, casResult=true
currentValue=true, currentMark=true
currentValue=true, currentMark=false, attemptMarkResult=true
currentValue=null, currentMark=false
currentValue=true, currentMark=true, wCasResult=true

对象的属性修改类型原子类

对象的属性修改类型原子类介绍

如果需要原子更新某个类里的某个字段时,需要用到对象的属性修改类型原子类。

  • AtomicIntegerFieldUpdater:原子更新整形字段的更新器
  • AtomicLongFieldUpdater:原子更新长整形字段的更新器
  • AtomicReferenceFieldUpdater :原子更新引用类型里的字段的更新器

要想原子地更新对象的属性需要两步。第一步,因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。第二步,更新的对象属性必须使用 public volatile 修饰符。

上面三个类提供的方法几乎相同,所以我们这里以 AtomicIntegerFieldUpdater为例子来介绍。

AtomicIntegerFieldUpdater 类使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class AtomicIntegerFieldUpdaterTest {
public static void main(String[] args) {
AtomicIntegerFieldUpdater<User> a = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");

User user = new User("Java", 22);
System.out.println(a.getAndIncrement(user));// 22
System.out.println(a.get(user));// 23
}
}

class User {
private String name;
public volatile int age;

public User(String name, int age) {
super();
this.name = name;
this.age = age;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

}

输出结果:

1
2
22
23

LongAdder

LongAdder实现原理图

img

LongAdder则是内部维护一个Cells数组,每个Cell里面有一个初始值为0的long型变量,在同等并发量的情况下,争夺单个变量的线程会减少,这是变相的减少了争夺共享资源的并发量,另外多个线程在争夺同一个原子变量时候,如果失败并不是自旋CAS重试,而是尝试获取其他原子变量的锁,最后当获取当前值时候是把所有变量的值累加后再加上base的值返回的。

LongAdder维护了要给延迟初始化的原子性更新数组和一个基值变量base数组的大小保持是2的N次方大小,数组表的下标使用每个线程的hashcode值的掩码表示,数组里面的变量实体是Cell类型。

Cell 类型是Atomic的一个改进,用来减少缓存的争用,对于大多数原子操作字节填充是浪费的,因为原子操作都是无规律的分散在内存中进行的,多个原子性操作彼此之间是没有接触的,但是原子性数组元素彼此相邻存放将能经常共享缓存行,也就是伪共享。所以这在性能上是一个提升。

另外由于Cells占用内存是相对比较大的,所以一开始并不创建,而是在需要时候再创建,也就是惰性加载,当一开始没有空间时候,所有的更新都是操作base变量。

LongAdder类关系图

img

Striped64类
Striped64是一个高并发累加的工具类。
Striped64的设计核心思路就是通过内部的分散计算来避免竞争。
Striped64内部包含一个base和一个Cell[] cells数组,又叫hash表。
没有竞争的情况下,要累加的数通过cas累加到base上;如果有竞争的话,会将要累加的数累加到Cells数组中的某个cell元素里面。所以整个Striped64的值为sum=base+∑[0~n]cells。

Striped64核心属性
transient volatile Cell[] cells; // 存放cell的hash表,大小为2乘幂
transient volatile long base; // 基础值(1.无竞争时更新,2.cells数组初始化过程不可用时,也会通过cas累加到base)
transient volatile int cellsBusy; // 自旋锁,通过CAS操作加锁(1.初始化cells数组,2.创建cell单元,3.cells扩容)

成员变量cells
cells数组是LongAdder高性能实现的必杀器:
AtomicInteger只有一个value,所有线程累加都要通过cas竞争value这一个变量,高并发下线程争用非常严重;
而LongAdder则有两个值用于累加,一个是base,它的作用类似于AtomicInteger里面的value,在没有竞争的情况不会用到cells数组,这时使用base做累加,有了竞争后cells数组就上场了,第一次初始化长度为2,以后每次扩容都是变为原来的两倍,直到cells数组的长度大于等于当前服务器cpu的数量为止就不在扩容(CPU能够并行的CAS操作的最大数量是它的核心数),每个线程会通过线程对cells[threadLocalRandomProbe%cells.length]位置的Cell对象中的value做累加,这样相当于将线程绑定到了cells中的某个cell对象上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 为提高性能,使用注解@sun.misc.Contended,用来避免伪共享
// 伪共享简单来说就是会破坏其它线程在缓存行中的值,导致重新从主内存读取,降低性能。
@sun.misc.Contended static final class Cell {
//用来保存要累加的值
volatile long value;
Cell(long x) { value = x; }
//使用UNSAFE类的cas来更新value值
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
private static final sun.misc.Unsafe UNSAFE;
//value在Cell类中存储位置的偏移量;
private static final long valueOffset;
//这个静态方法用于获取偏移量
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}

成员变量cellsBusy
cellsBusy作用是当要修改cells数组时加锁,防止多线程同时修改cells数组,0为无锁,1为加锁,加锁的状况有三种

  1. cells数组初始化的时候
  2. cells数组扩容的时候
  3. 如果cells数组中某个元素为null,给这个位置创建新的Cell对象的时候

成员变量base
它有两个作用:

  1. 在开始没有竞争的情况下,将累加值累加到base
  2. 在cells初始化的过程中,cells不可用,这时会尝试将值累加到base上

LongAdder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
// 累加方法,参数x为累加的值    
public void add(long x) {
/**
* as 表示cells引用
* b 表示获取的base值
* v 表示期望值
* m 表示cells数组的长度
* a 表示当前线程命中的cell单元格
/
Cell[] as; long b, v; int m; Cell a;
/**
* 如果一下两种条件则继续执行if内的语句
* 1. cells数组不为null(不存在争用的时候,cells数组一定为null,一旦对base的cas操作失败,才会初始化cells数组)
* 2. 如果cells数组为null,如果casBase执行成功,则直接返回,如果casBase方法执行失败(casBase失败,说明第一次争用冲突产生,需要对cells数组初始化)进入if内;
* casBase方法很简单,就是通过UNSAFE类的cas设置成员变量base的值为base+要累加的值
* casBase执行成功的前提是无竞争,这时候cells数组还没有用到为null,可见在无竞争的情况下是类似于AtomticInteger处理方式,使用cas做累加。
*/
if ((as = cells) != null || !casBase(b = base, b + x)) {
//uncontended判断cells数组中,当前线程要做cas累加操作的某个元素是否#不#存在争用,如果cas失败则存在争用;uncontended=false代表存在争用,uncontended=true代表不存在争用。
boolean uncontended = true;
/**
*1. as == null : cells数组未被初始化,成立则直接进入if执行cell初始化
*2. (m = as.length - 1) < 0: cells数组的长度为0
*条件1与2都代表cells数组没有被初始化成功,初始化成功的cells数组长度为2;
*3. (a = as[getProbe() & m]) == null :如果cells被初始化,且它的长度不为0,则通过getProbe方法获取当前线程Thread的threadLocalRandomProbe变量的值,初始为0,然后执行threadLocalRandomProbe&(cells.length-1 ),相当于m%cells.length;如果cells[threadLocalRandomProbe%cells.length]的位置为null,这说明这个位置从来没有线程做过累加,需要进入if继续执行,在这个位置创建一个新的Cell对象;
*4. !(uncontended = a.cas(v = a.value, v + x)):尝试对cells[threadLocalRandomProbe%cells.length]位置的Cell对象中的value值做累加操作,并返回操作结果,如果失败了则进入if,重新计算一个threadLocalRandomProbe;
如果进入if语句执行longAccumulate方法,有三种情况
1. 前两个条件代表cells没有初始化,
2. 第三个条件指当前线程hash到的cells数组中的位置还没有其它线程做过累加操作,
3. 第四个条件代表产生了冲突,uncontended=false
**/
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
//获取当前线程的threadLocalRandomProbe值作为hash值,如果当前线程的threadLocalRandomProbe为0,说明当前线程是第一次进入该方法,则强制设置线程的threadLocalRandomProbe为ThreadLocalRandom类的成员静态私有变量probeGenerator的值,后面会详细将hash值的生成;
//另外需要注意,如果threadLocalRandomProbe=0,代表新的线程开始参与cell争用的情况
//1.当前线程之前还没有参与过cells争用(也许cells数组还没初始化,进到当前方法来就是为了初始化cells数组后争用的),是第一次执行base的cas累加操作失败;
//2.或者是在执行add方法时,对cells某个位置的Cell的cas操作第一次失败,则将wasUncontended设置为false,那么这里会将其重新置为true;第一次执行操作失败;
//凡是参与了cell争用操作的线程threadLocalRandomProbe都不为0;
int h;
if ((h = getProbe()) == 0) {
//初始化ThreadLocalRandom;
ThreadLocalRandom.current(); // force initialization
//将h设置为0x9e3779b9
h = getProbe();
//设置未竞争标记为true
wasUncontended = true;
}
//cas冲突标志,表示当前线程hash到的Cells数组的位置,做cas累加操作时与其它线程发生了冲突,cas失败;collide=true代表有冲突,collide=false代表无冲突
boolean collide = false;
for (;;) {
Cell[] as; Cell a; int n; long v;
//这个主干if有三个分支
//1.主分支一:处理cells数组已经正常初始化了的情况(这个if分支处理add方法的四个条件中的3和4)
//2.主分支二:处理cells数组没有初始化或者长度为0的情况;(这个分支处理add方法的四个条件中的1和2)
//3.主分支三:处理如果cell数组没有初始化,并且其它线程正在执行对cells数组初始化的操作,及cellbusy=1;则尝试将累加值通过cas累加到base上
//先看主分支一
if ((as = cells) != null && (n = as.length) > 0) {
/**
*内部小分支一:这个是处理add方法内部if分支的条件3:如果被hash到的位置为null,说明没有线程在这个位置设置过值,没有竞争,可以直接使用,则用x值作为初始值创建一个新的Cell对象,对cells数组使用cellsBusy加锁,然后将这个Cell对象放到cells[m%cells.length]位置上
*/
if ((a = as[(n - 1) & h]) == null) {
//cellsBusy == 0 代表当前没有线程cells数组做修改
if (cellsBusy == 0) {
//将要累加的x值作为初始值创建一个新的Cell对象,
Cell r = new Cell(x);
//如果cellsBusy=0无锁,则通过cas将cellsBusy设置为1加锁
if (cellsBusy == 0 && casCellsBusy()) {
//标记Cell是否创建成功并放入到cells数组被hash的位置上
boolean created = false;
try {
Cell[] rs; int m, j;
//再次检查cells数组不为null,且长度不为空,且hash到的位置的Cell为null
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
//将新的cell设置到该位置
rs[j] = r;
created = true;
}
} finally {
//去掉锁
cellsBusy = 0;
}
//生成成功,跳出循环
if (created)
break;
//如果created为false,说明上面指定的cells数组的位置cells[m%cells.length]已经有其它线程设置了cell了,继续执行循环。
continue;
}
}
//如果执行的当前行,代表cellsBusy=1,有线程正在更改cells数组,代表产生了冲突,将collide设置为false
collide = false;

/**
*内部小分支二:如果add方法中条件4的通过cas设置cells[m%cells.length]位置的Cell对象中的value值设置为v+x失败,说明已经发生竞争,将wasUncontended设置为true,跳出内部的if判断,最后重新计算一个新的probe,然后重新执行循环;
*/
} else if (!wasUncontended)
//设置未竞争标志位true,继续执行,后面会算一个新的probe值,然后重新执行循环。
wasUncontended = true;
/**
*内部小分支三:新的争用线程参与争用的情况:处理刚进入当前方法时threadLocalRandomProbe=0的情况,也就是当前线程第一次参与cell争用的cas失败,这里会尝试将x值加到cells[m%cells.length]的value ,如果成功直接退出
*/
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
/**
*内部小分支四:分支3处理新的线程争用执行失败了,这时如果cells数组的长度已经到了最大值(大于等于cup数量),或者是当前cells已经做了扩容,则将collide设置为false,后面重新计算prob的值*/
else if (n >= NCPU || cells != as)
collide = false;
/**
*内部小分支五:如果发生了冲突collide=false,则设置其为true;会在最后重新计算hash值后,进入下一次for循环
*/
else if (!collide)
//设置冲突标志,表示发生了冲突,需要再次生成hash,重试。 如果下次重试任然走到了改分支此时collide=true,!collide条件不成立,则走后一个分支
collide = true;
/**
*内部小分支六:扩容cells数组,新参与cell争用的线程两次均失败,且符合库容条件,会执行该分支
*/
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//检查cells是否已经被扩容
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//为当前线程重新计算hash值
h = advanceProbe(h);

//这个大的分支处理add方法中的条件1与条件2成立的情况,如果cell表还未初始化或者长度为0,先尝试获取cellsBusy锁。
}else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
//初始化cells数组,初始容量为2,并将x值通过hash&1,放到0个或第1个位置上
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
//解锁
cellsBusy = 0;
}
//如果init为true说明初始化成功,跳出循环
if (init)
break;
}
/**
*如果以上操作都失败了,则尝试将值累加到base上;
*/
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // Fall back on using base
break;
}
}

hash生成策略
hash决定了当前线程将累加值定位到哪个cell中,hash算法尤其重要。
hash就是java的Thread类里面有一个成员变量,初始值为0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/** Probe hash value; nonzero if threadLocalRandomSeed initialized */
@sun.misc.Contended("tlr")
int threadLocalRandomProbe;

// LongAdder的父类Striped64里通过getProbe方法获取当前线程threadLocalRandomProbe
static final int getProbe() {
// PROBE是threadLocalRandomProbe变量在Thread类里面的偏移量
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

// threadLocalRandomProbe初始化
// 线程对LongAdder的累加操作,在没有进入longAccumulate方法前,threadLocalRandomProbe一直都是0,当发生争用后才会进入longAccumulate方法中,进入该方法第一件事就是判断threadLocalRandomProbe是否为0,如果为0,则将其设置为0x9e3779b9
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current();
h = getProbe();
//设置未竞争标记为true
wasUncontended = true;
}

static final void localInit() {
// private static final AtomicInteger probeGenerator = new AtomicInteger();
// private static final int PROBE_INCREMENT = 0x9e3779b9;
int p = probeGenerator.addAndGet(PROBE_INCREMENT);
int probe = (p == 0) ? 1 : p; // skip 0
long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
Thread t = Thread.currentThread();
UNSAFE.putLong(t, SEED, seed);
UNSAFE.putInt(t, PROBE, probe);
}

threadLocalRandomProbe重新生成
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}