LongAdder and LongAccumulator are used for big data statistics and do not require real-time accuracy.

The basic idea of LongAdder is to disperse hot spots. The value is distributed into a Cell array. Different threads will hit different slots in the array. To get the real value of long, simply return the sum of the variables in each slot (space for time).

example

class Demo{ public volatile int i = 0; public AtomicInteger atomicInteger = new AtomicInteger(); public LongAdder longAdder = new LongAdder(); LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x+y,0); public synchronized void syc_add(){ i++; } public void syc_in_add(){ synchronized (this){ i++; } } public void atomic (){ atomicInteger.incrementAndGet(); } public void add(){ longAdder.increment(); } public void add_LongAccumulator() { longAccumulator.accumulate(1); } } public class LongAdderDemo { public static final int SIZE_THREAD = 50; public static final int _1W = 10000; public static void main(String[] args) throws InterruptedException { Demo demo = new Demo(); long startTime; long endTime; CountDownLatch countDownLatch1 = new CountDownLatch(SIZE_THREAD); CountDownLatch countDownLatch2 = new CountDownLatch(SIZE_THREAD); CountDownLatch countDownLatch3 = new CountDownLatch(SIZE_THREAD); CountDownLatch countDownLatch4 = new CountDownLatch(SIZE_THREAD); CountDownLatch countDownLatch5 = new CountDownLatch(SIZE_THREAD); startTime = System.currentTimeMillis(); for (int i = 1; i <=SIZE_THREAD; i++) { new Thread(() -> { try { for (int j = 1; j <=100 * _1W; j++) { demo.syc_add(); } }catch (Exception e){ e.printStackTrace(); }finally { countDownLatch1.countDown(); } },String.valueOf(i)).start(); } countDownLatch1.await(); endTime = System.currentTimeMillis(); System. The out. Println (" -- costTime: "+ (endTime - startTime)," ms "+" \ t syc_add () "+" \ t "+ demo. I); demo.i=0; startTime = System.currentTimeMillis(); for (int i = 1; i <=SIZE_THREAD; i++) { new Thread(() -> { try { for (int j = 1; j <=100 * _1W; j++) { demo.syc_in_add(); } }catch (Exception e){ e.printStackTrace(); }finally { countDownLatch2.countDown(); } },String.valueOf(i)).start(); } countDownLatch2.await(); endTime = System.currentTimeMillis(); System. The out. Println (" -- costTime: "+ (endTime - startTime)," ms "+" \ t syc_in_add () "+" \ t "+ demo. I); startTime = System.currentTimeMillis(); for (int i = 1; i <=SIZE_THREAD; i++) { new Thread(() -> { try { for (int j = 1; j <=100 * _1W; j++) { demo.atomic(); } }catch (Exception e){ e.printStackTrace(); }finally { countDownLatch3.countDown(); } },String.valueOf(i)).start(); } countDownLatch3.await(); endTime = System.currentTimeMillis(); System. Out. Println (" -- costTime: "+ (endTime - startTime)," ms "+" \ t atomic () "+" \ t "+ demo. AtomicInteger. The get ()); startTime = System.currentTimeMillis(); for (int i = 1; i <=SIZE_THREAD; i++) { new Thread(() -> { try { for (int j = 1; j <=100 * _1W; j++) { demo.add_LongAccumulator(); } }catch (Exception e){ e.printStackTrace(); }finally { countDownLatch4.countDown(); } },String.valueOf(i)).start(); } countDownLatch4.await(); endTime = System.currentTimeMillis(); System.out.println("----costTime: "+(endtime-startTime) +"\t add_LongAccumulator()"+"\t"+ demo.longaccumulator. startTime = System.currentTimeMillis(); for (int i = 1; i <=SIZE_THREAD; i++) { new Thread(() -> { try { for (int j = 1; j <=100 * _1W; j++) { demo.add(); } }catch (Exception e){ e.printStackTrace(); }finally { countDownLatch5.countDown(); } },String.valueOf(i)).start(); } countDownLatch5.await(); endTime = System.currentTimeMillis(); System. The out. Println (" -- costTime: "+ (endTime - startTime)," ms "+" \ t the add () "+" \ t "+ demo. LongAdder. The sum ()); }}Copy the code

Source code analysis

When multiple threads need to operate on the value at the same time, they can hash the thread ID to obtain the hash value, and then incrementing the value corresponding to the hash value mapped to a subscript of the array cells. When all threads are complete, they add up all the values of the array cells and the base value.

Value = Base + Cell[1]+… +Cell[N]

Inherit the figure

Striped64 (Important attributes)

  • Base: Similar to the global value in AtomciLong, data is directly added to base without contention or written to base when cells expand.

  • Collide: expansion, false, expansion and will not be true could be expanded.

  • CellsBusy: Locks are required for initializing or expanding cells. 0: no locks. 1: locks are held by other threads

  • CaseCellsBusy () : Change the value of cellsBusy through CAS operation. CAS successfully indicates that the lock is obtained. Return true

  • NCPU: indicates the number of cpus on the computer, which will be used for Cell array expansion

  • GetProbe () : obtains the hash value of the current thread

  • AdvanceProbe () : resets the hash value of the current thread

The add method

  • As stands for cells reference

  • B indicates the base value obtained

  • V represents the expected value

  • M is the length of the cells array

  • A represents the cell hit by the current thread

  • (cs = cells) ! = null Determines whether cells is null

  • CasBase (b = base, b + x) Check whether the CAS is updated successfully based on the base

  • (m = cs.leng-1) < 0 to determine whether the number of cells is less than zero

  • Cs == null Checks whether the cell array is null

  • (c = cs[getProbe() &m]) == null Computes the hash value of the thread ID to determine whether the drop cell is null

  • (Uncontended = C. cas(v = C. value, v + x) Determines whether to expand capacity

LongAccumulate method

Case1: Cell[] has been initialized

Case2: The Cell[] array is not initialized

Case3: The Cell[] array is being initialized

case2

case1

Check whether the element pointing to the data position after the current thread hash is empty. If so, put the Cell data into the array and break out of the loop. If not, continue the loop.

case3

I’m going to operate on Base

The sum method

Iterate over cells [] to accumulate, achieving final consistency rather than strong consistency.