J.U.C - LongAdder & LongAccumulator
在高并发、多线程环境下,对一个对象的字段进行修改是很危险的操作。如果不采用特殊的手段,修改后的数据是无法实现与设计一致的。
- 实现线程安全的修改对象字段大致有以下几种方式:
- 加锁:synchronized加锁,Reentrant Lock加锁
- 使用JUC原子类: 如
AtomicInteger
, LongAdder
, LongAccumulator
- 本文将使用4种不同的方式实现多线程环境下的
i++
操作,并比较不同方式的性能差异及实现原理
- 结果显示,JUC下的
LongAdder
及LongAccumulator
性能最为优越
实验代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
| import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAdder;
class ClickNum { int num = 0; public synchronized void clickBySync(){ num++; }
AtomicLong atomicLong = new AtomicLong(0); public void clickByAtomicLong(){ atomicLong.getAndIncrement(); }
LongAdder longAdder = new LongAdder(); public void clickByLongAdder(){ longAdder.increment(); }
LongAccumulator longAccumulator = new LongAccumulator((x,y) -> x + y, 0); public void clickByLongAccumulator(){ longAccumulator.accumulate(1); }
}
public class ClickDemo { public static final int _1W = 10000; public static final int threadNum = 50;
public static void main(String[] args) throws InterruptedException { ClickNum clickNum = new ClickNum(); long starttime; long endtime;
CountDownLatch countDownLatch = new CountDownLatch(threadNum); CountDownLatch countDownLatch1 = new CountDownLatch(threadNum); CountDownLatch countDownLatch2 = new CountDownLatch(threadNum); CountDownLatch countDownLatch3 = new CountDownLatch(threadNum);
starttime = System.currentTimeMillis(); for (int i = 0; i < threadNum; i++) { new Thread(() -> { try{ for (int j = 0; j < 100 * _1W; j++) {
clickNum.clickBySync(); } }finally { countDownLatch.countDown(); } },String.valueOf(i)).start(); } countDownLatch.await(); endtime = System.currentTimeMillis(); System.out.println("synchronized : " + clickNum.num + "\t timecost : " + (endtime -starttime));
starttime = System.currentTimeMillis(); for (int i = 0; i < threadNum; i++) { new Thread(() -> { try{ for (int j = 0; j < 100 * _1W; j++) { clickNum.clickByAtomicLong(); } }finally { countDownLatch1.countDown(); } },String.valueOf(i)).start(); } countDownLatch1.await(); endtime = System.currentTimeMillis(); System.out.println("clickByAtomicLong : " + clickNum.atomicLong + "\t timecost : " + (endtime -starttime));
starttime = System.currentTimeMillis(); for (int i = 0; i < threadNum; i++) { new Thread(() -> { try{ for (int j = 0; j < 100 * _1W; j++) {
clickNum.clickByLongAdder(); } }finally { countDownLatch2.countDown(); } },String.valueOf(i)).start(); } countDownLatch2.await(); endtime = System.currentTimeMillis(); System.out.println("clickByLongAdder : " + clickNum.longAdder.sum() + "\t timecost : " + (endtime -starttime));
starttime = System.currentTimeMillis(); for (int i = 0; i < threadNum; i++) { new Thread(() -> { try{ for (int j = 0; j < 100 * _1W; j++) {
clickNum.clickByLongAccumulator(); } }finally { countDownLatch3.countDown(); } },String.valueOf(i)).start(); } countDownLatch3.await(); endtime = System.currentTimeMillis(); System.out.println("clickByLongAccumulator : " + clickNum.longAccumulator.get() + "\t timecost : " + (endtime -starttime));
} }
|
实验结果
synchronized : 50000000 timecost : 1378
clickByAtomicLong : 50000000 timecost : 2262
clickByLongAdder : 50000000 timecost : 108
clickByLongAccumulator : 50000000 timecost : 110
- 使用了4种不同的方式实现50个线程,每个线程让共享变量1million次i++。
longadder
和 longaccumulator
的耗时仅为100多毫秒,而使用 synchronized
和 AtomicLong
的耗时均在1000ms以上。
源码分析
Synchronized
& AtomicInteger
- 悲观锁&乐观锁
众所周知,Synchronized
关键字修饰方法时,会对整个对象加锁,具体表现为:在字节码文件中,被Synchronized
关键字修饰的方法前后会插入monitor,即监视器,此为悲观锁的基本实现之一。
而AtomicInteger
或原子类,采用的CAS思想,实现的是乐观锁,即在线程尝试修改数据时,不会加锁,而是通过Compare and Swap实现修改。
在读多写少的情况下,读线程可直接获取该字段的值,而写线程因为没有采用加锁操作,且写操作较少,CAS不易失败或自旋次数少,写操作的速度会很快。
但是,在本文的实验中,对同一个字段的操作全部为写操作(i++
),因此使用AtomicInteger
时,CAS的失败次数极高,由此导致的自旋行为使得整体用时甚至超过了Synchronized
悲观锁的实现方式。
LongAdder
& LongAccumulator
- 负载均衡 | 动态扩容
进入LongAdder
和LongAccumulator
的源码来看,两者其实非常相似,只是在LongAccumulator
中多了一个applyfunc()
。
因此,我们就看LongAdder的源码即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }
public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
|
- 首先,从
sum()
方法可以看出,LongAdder的值其实是有两部分组成,一部分是base
值,另一部份是Cell[]
数组中每个元素的value
值求和而得。
longAdder.increment()
方法会调用到add()
方法。
- 在
Cell[]
数组没初始化的情况下,会尝试cas改变base
值
- 如果Cas成功,则结束
- 如果失败,则会进入第一个if内部
- 在
Cell[]
数组已经初始化的情况下,会直接进去结构体内部
- 进入第一个if结构的内部
- 判断
Cell[]
数组是否初始化,取hash值得到的数组元素是否初始化,尝试CAS改变该数组元素的value
- 如果CAS成功了,就结束
- 未初始化或者 CAS失败了,就调用
longAccumulate()
方法
- 下一步可以看看
longAccumulate()
方法做了什么
在longAccumtulate()
的代码里有很多的if-else,我们分层来看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true; } boolean collide = false; for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { Cell r = new Cell(x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; } } collide = false; } else if (!wasUncontended) wasUncontended = true; else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) collide = false; else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; } }
|
能够进入longAccumulate()
这个方法的前提在于,CAS失败和Cell[]数组初始化失败。因此,该方法的主要作用就是:初始化,扩容,再次重试CAS。
循环体内部可以先分成三个不同条件分枝:
- Cell数组已经初始化
- Cell数组还未初始化,因此尝试去获取自旋锁
cellsBusy
- 获取自旋锁失败,再次尝试CAS修改base值
后两个分支很明确了,第一个分支内还有多个if-else嵌套:
- 如果是最后一个Cell没有初始化,就去尝试attach new Cell
wasUncontended
值是传入进来的,如果是false,则证明在进入该方法前,CAS已经失败,在这一轮循环中,wasUncontended
会被标记为true,然后在下一次循环中就会再一次尝试CAS
- 不论之前的CAS是尝试过而失败了还是根本没有过CAS,都进行一次CAS,如果这次成功就退出循环,如果失败,就进行下一个条件判断
- 之后的三个条件都是为了扩容Cell数组。首先先判断是否已经达到max size,如果达到了,就跳过后面的扩容,然后重新获取新的index,进行下一轮循环尝试CAS。如果没达到就进入下一个条件判断
- 如果此时
collide
值仍为false,则改为true,然后进行下一轮循环。如果此时已经被改为true,就证明上一轮循环就已经判定为碰撞了,这一轮循环应该进行扩容操作,所以进入下一个条件判断。
- 获取自旋锁,然后进行扩容
总结
相较于synchronize
或原子类这样要么悲观锁要么乐观锁的简单粗暴的实现线程安全地修改共享变量,LongAdder
的设计无异于是更加出色的。
LongAdder
所采用的策略层层递进:
- 首先CAS修改base
- 一旦发生CAS失败,就使用Cell数组
- Cell数组要是没初始化就初始化,初始化的时候要是遇到自旋锁占用就再试一次CAS base
- 要是用了Cell还冲突说明容量不够,因此先检查是不是Cell没初始化
- 如果当前Cell数组都拉满了,就换个位置再试CAS
- 如果又失败了,就说明得扩容Cell数组了。
- 扩容的时候要检查是不是到max size了,如果已经max size了就没法扩容了,只能换个位置再来CAS。如果不是,那就扩容。
总的来说,这样的设计可以保证在低并发下,使用简单的CAS实现线程安全,节约资源;在高并发下,使用负载均衡,动态扩容的方式,保证性能,又能兼顾资源占用。
这样的设计在各类分布式系统中都可以见到,实在是妙!