JUC-LongAdder源码解析

J.U.C - LongAdder & LongAccumulator

在高并发、多线程环境下,对一个对象的字段进行修改是很危险的操作。如果不采用特殊的手段,修改后的数据是无法实现与设计一致的。

  • 实现线程安全的修改对象字段大致有以下几种方式:
    • 加锁:synchronized加锁,Reentrant Lock加锁
    • 使用JUC原子类: 如AtomicInteger, LongAdder, LongAccumulator
  • 本文将使用4种不同的方式实现多线程环境下的i++操作,并比较不同方式的性能差异及实现原理
  • 结果显示,JUC下的LongAdderLongAccumulator性能最为优越

实验代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;

class ClickNum {
int num = 0;
public synchronized void clickBySync(){
num++;
}

AtomicLong atomicLong = new AtomicLong(0);
public void clickByAtomicLong(){
atomicLong.getAndIncrement();
}

LongAdder longAdder = new LongAdder();
public void clickByLongAdder(){
longAdder.increment();
}

LongAccumulator longAccumulator = new LongAccumulator((x,y) -> x + y, 0);
public void clickByLongAccumulator(){
longAccumulator.accumulate(1);
}


}

/**
* 50 threads, each click 1m times
*/
public class ClickDemo {
public static final int _1W = 10000;
public static final int threadNum = 50;

public static void main(String[] args) throws InterruptedException {
ClickNum clickNum = new ClickNum();
long starttime;
long endtime;


CountDownLatch countDownLatch = new CountDownLatch(threadNum);
CountDownLatch countDownLatch1 = new CountDownLatch(threadNum);
CountDownLatch countDownLatch2 = new CountDownLatch(threadNum);
CountDownLatch countDownLatch3 = new CountDownLatch(threadNum);

//0
starttime = System.currentTimeMillis();
for (int i = 0; i < threadNum; i++) {
new Thread(() -> {
try{
for (int j = 0; j < 100 * _1W; j++) {

clickNum.clickBySync();
}
}finally {
countDownLatch.countDown();
}
},String.valueOf(i)).start();
}
countDownLatch.await();
endtime = System.currentTimeMillis();
System.out.println("synchronized : " + clickNum.num + "\t timecost : " + (endtime -starttime));


//1
starttime = System.currentTimeMillis();
for (int i = 0; i < threadNum; i++) {
new Thread(() -> {
try{
for (int j = 0; j < 100 * _1W; j++) {
clickNum.clickByAtomicLong();
}
}finally {
countDownLatch1.countDown();
}
},String.valueOf(i)).start();
}
countDownLatch1.await();
endtime = System.currentTimeMillis();
System.out.println("clickByAtomicLong : " + clickNum.atomicLong + "\t timecost : " + (endtime -starttime));

//2
starttime = System.currentTimeMillis();
for (int i = 0; i < threadNum; i++) {
new Thread(() -> {
try{
for (int j = 0; j < 100 * _1W; j++) {

clickNum.clickByLongAdder();
}
}finally {
countDownLatch2.countDown();
}
},String.valueOf(i)).start();
}
countDownLatch2.await();
endtime = System.currentTimeMillis();
System.out.println("clickByLongAdder : " + clickNum.longAdder.sum() + "\t timecost : " + (endtime -starttime));

//3
starttime = System.currentTimeMillis();
for (int i = 0; i < threadNum; i++) {
new Thread(() -> {
try{
for (int j = 0; j < 100 * _1W; j++) {

clickNum.clickByLongAccumulator();
}
}finally {
countDownLatch3.countDown();
}
},String.valueOf(i)).start();
}
countDownLatch3.await();
endtime = System.currentTimeMillis();
System.out.println("clickByLongAccumulator : " + clickNum.longAccumulator.get() + "\t timecost : " + (endtime -starttime));

}
}

实验结果

synchronized : 50000000 timecost : 1378
clickByAtomicLong : 50000000 timecost : 2262
clickByLongAdder : 50000000 timecost : 108
clickByLongAccumulator : 50000000 timecost : 110

  • 使用了4种不同的方式实现50个线程,每个线程让共享变量1million次i++。
  • longadderlongaccumulator的耗时仅为100多毫秒,而使用 synchronizedAtomicLong 的耗时均在1000ms以上。

源码分析

Synchronized & AtomicInteger - 悲观锁&乐观锁

众所周知,Synchronized 关键字修饰方法时,会对整个对象加锁,具体表现为:在字节码文件中,被Synchronized 关键字修饰的方法前后会插入monitor,即监视器,此为悲观锁的基本实现之一。

AtomicInteger或原子类,采用的CAS思想,实现的是乐观锁,即在线程尝试修改数据时,不会加锁,而是通过Compare and Swap实现修改。

在读多写少的情况下,读线程可直接获取该字段的值,而写线程因为没有采用加锁操作,且写操作较少,CAS不易失败或自旋次数少,写操作的速度会很快。

但是,在本文的实验中,对同一个字段的操作全部为写操作(i++),因此使用AtomicInteger时,CAS的失败次数极高,由此导致的自旋行为使得整体用时甚至超过了Synchronized 悲观锁的实现方式。

LongAdder & LongAccumulator - 负载均衡 | 动态扩容

进入LongAdderLongAccumulator的源码来看,两者其实非常相似,只是在LongAccumulator中多了一个applyfunc()

因此,我们就看LongAdder的源码即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
	// increment()方法最终会调用这个方法
public void add(long x) {
//声明的一些变量
Cell[] as; long b, v; int m; Cell a;
//判断条件中间是||符号,所以只有当两个条件都是false才不会进入内部
//1.as有没有被初始化,没有的话,第一个条件为false
//2.cas操作更新base值,如果成功了返回false
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
//双重判断as是否被初始化,as Cell数组的某个Cell是否初始化,cas操作某个Cell a的value
//没被初始化 || cas失败 的情况下,才会进一步调用longAccumulate方法
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

//非原子操作,得到的值只能保证最终一致性,而不能保证实时一致性
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
  • 首先,从sum()方法可以看出,LongAdder的值其实是有两部分组成,一部分是base值,另一部份是Cell[]数组中每个元素的value值求和而得。
  • longAdder.increment()方法会调用到add()方法。
      1. Cell[]数组没初始化的情况下,会尝试cas改变base
        1. 如果Cas成功,则结束
        2. 如果失败,则会进入第一个if内部
      2. Cell[]数组已经初始化的情况下,会直接进去结构体内部
    • 进入第一个if结构的内部
      • 判断Cell[]数组是否初始化,取hash值得到的数组元素是否初始化,尝试CAS改变该数组元素的value
        • 如果CAS成功了,就结束
        • 未初始化或者 CAS失败了,就调用longAccumulate()方法
  • 下一步可以看看longAccumulate()方法做了什么

longAccumtulate() 的代码里有很多的if-else,我们分层来看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
//1. Cell[] 已初始化
if ((as = cells) != null && (n = as.length) > 0) {
//1.1 Cell数组的最后一个元素未初始化
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//1.2 wasUncontended值是传入进来的,如果是false,则证明在进入该方法前,CAS已经失败。-> 判断是否尝试过CAS
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//1.3 wasUncontended已经被修改为true,尝试一次CAS
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//1.4 CAS失败了,判断Cell数组是否已到达max size或者已经过时
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
//1.5 检查collide标记,如果为false,则更改为true(能走到这一个条件分枝肯定是有冲突),如果已经是true了,则在一下个分支里进行扩容
else if (!collide)
collide = true;
//1.6 获取自旋锁 -> 扩容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//获取新的hash值,使得每一次循环都能在不同index处进行CAS
h = advanceProbe(h);
}
//2. 尝试获取自旋锁 - cellsBusy (其实就是个变量) -> 初始化整个Cell数组
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//3. CAS操作base值
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

能够进入longAccumulate()这个方法的前提在于,CAS失败和Cell[]数组初始化失败。因此,该方法的主要作用就是:初始化,扩容,再次重试CAS。

循环体内部可以先分成三个不同条件分枝:

  1. Cell数组已经初始化
  2. Cell数组还未初始化,因此尝试去获取自旋锁cellsBusy
  3. 获取自旋锁失败,再次尝试CAS修改base值

后两个分支很明确了,第一个分支内还有多个if-else嵌套:

  1. 如果是最后一个Cell没有初始化,就去尝试attach new Cell
  2. wasUncontended值是传入进来的,如果是false,则证明在进入该方法前,CAS已经失败,在这一轮循环中,wasUncontended会被标记为true,然后在下一次循环中就会再一次尝试CAS
  3. 不论之前的CAS是尝试过而失败了还是根本没有过CAS,都进行一次CAS,如果这次成功就退出循环,如果失败,就进行下一个条件判断
  4. 之后的三个条件都是为了扩容Cell数组。首先先判断是否已经达到max size,如果达到了,就跳过后面的扩容,然后重新获取新的index,进行下一轮循环尝试CAS。如果没达到就进入下一个条件判断
  5. 如果此时collide值仍为false,则改为true,然后进行下一轮循环。如果此时已经被改为true,就证明上一轮循环就已经判定为碰撞了,这一轮循环应该进行扩容操作,所以进入下一个条件判断。
  6. 获取自旋锁,然后进行扩容

总结

相较于synchronize或原子类这样要么悲观锁要么乐观锁的简单粗暴的实现线程安全地修改共享变量,LongAdder的设计无异于是更加出色的。

LongAdder所采用的策略层层递进:

  1. 首先CAS修改base
  2. 一旦发生CAS失败,就使用Cell数组
  3. Cell数组要是没初始化就初始化,初始化的时候要是遇到自旋锁占用就再试一次CAS base
  4. 要是用了Cell还冲突说明容量不够,因此先检查是不是Cell没初始化
  5. 如果当前Cell数组都拉满了,就换个位置再试CAS
  6. 如果又失败了,就说明得扩容Cell数组了。
  7. 扩容的时候要检查是不是到max size了,如果已经max size了就没法扩容了,只能换个位置再来CAS。如果不是,那就扩容。

总的来说,这样的设计可以保证在低并发下,使用简单的CAS实现线程安全,节约资源;在高并发下,使用负载均衡,动态扩容的方式,保证性能,又能兼顾资源占用。

这样的设计在各类分布式系统中都可以见到,实在是妙!


JUC-LongAdder源码解析
http://kun98-liu.gihub.io/2022/09/21/JUC-LongAdder源码解析/
Author
Liu Jiankun
Posted on
September 21, 2022
Licensed under