A, AtomicInteger

/ * * *@authorJava and algorithm learning: Monday */
public class AtomicInteger {

    // Don't use volatile
    public AtomicInteger count = new AtomicInteger(0);

    // synchronized
    public void m(a) {
        for (int i = 0; i < 1000; i++) {
            //count++count.incrementAndGet(); }}public static void main(String[] args) {
        T01_AtomicInteger t = new T01_AtomicInteger();

        List<Thread> threadList = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            threadList.add(new Thread(t::m, "t" + i));
        }

        threadList.forEach((o)->{
            o.start();
        });

        threadList.forEach((o)->{
            try {
                o.join();
            } catch(InterruptedException e) { e.printStackTrace(); }}); System.out.println(t.count); }}Copy the code

1. Low-level implementation

Compare And Swap/Set (CAS) Lock-free optimization And optimistic locking

cas(V, Expected, NewValue)

if V == E

V = New

otherwise try again or fail

V: The value to be changed

Expected: What is Expected of the current value

NewValue: indicates the NewValue to be set

For example, the value to be changed is 3 (that is, the value I got at the beginning was 3). When PERFORMING the CAS operation, I Expected the value to be 3. If the CAS does not equal my expected value of 3 when executing, the value has been changed by another thread (let’s say 4), then I try again. At this point, I expect the value to be 4, and if no other thread changes the value to be equal to my expected value of 4 while performing the CAS operation, THEN I perform the CAS operation to change the value to the new value.

(1) If another thread changes the value to another value immediately after the CAS operation is performed, wouldn’t that still be a problem?

CAS operations are supported by CPU primitives. That is, CAS operations are CPU instruction level operations and cannot be changed.

(2) ABA problems

(Just like when your girlfriend gets back together with you and finds another boyfriend in the middle, she may not be your original girlfriend anymore.)

When I performed the CAS operation, the value was changed to 2 by another thread, and then changed to 3. If it’s a basic type, don’t worry about it. If you want to process, you need to add the version number, that is, any time this value is changed, the version number is increased by 1, later check with the version number.

(3) How does the bottom layer of CAS do it?

Unsafe is done via the sun.misc.Unsafe class (most methods are native). Main functions of this class: Direct manipulation of JVM memory (Native Locatememory), direct generation of class instances (Native allocateInstance), direct manipulation of variables (native getInt, native GetObject), and CAS related operations (Native compareAndSwapObject). This class can only be used by reflection or getUnsafe to retrieve objects of the class (singleton), not directly.

SyncLong VS AtomicLong VS LongAdder

/ * * *@authorJava and algorithm learning: Monday */
public class AtomicSynclongLongAdder {
    private static AtomicLong count1 = new AtomicLong(0L);
    private static long count2 = 0L;
    private static LongAdder count3 = new LongAdder();

    public static void main(String[] args) throws Exception {
        Thread[] threads = new Thread[1000];

        //AtomicLong
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 100000; j++) { count1.incrementAndGet(); }}); }long start = System.currentTimeMillis();
        for (Thread thread : threads) {
            thread.start();
        }
        for (Thread thread : threads) {
            thread.join();
        }
        long end = System.currentTimeMillis();
        System.out.println("AtomicLong: " + count1.get() + " time: " + (end - start));

        //long
        Object o = new Object();
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 100000; j++) {
                    synchronized(o) { count2++; }}}); } start = System.currentTimeMillis();for (Thread thread : threads) {
            thread.start();
        }
        for (Thread thread : threads) {
            thread.join();
        }
        end = System.currentTimeMillis();
        System.out.println("Long: " + count2 + " time: " + (end - start));

        //LongAdder
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 100000; j++) { count3.increment(); }}); } start = System.currentTimeMillis();for (Thread thread : threads) {
            thread.start();
        }
        for (Thread thread : threads) {
            thread.join();
        }
        end = System.currentTimeMillis();
        System.out.println("LongAdder: " + count3.longValue() + " time: "+ (end - start)); }}Copy the code

LongAdder uses segmented locking internally (also CAS implementation internally). It puts the values in an array, say, of size 4, and locks 250 threads in each array, performs an operation on each array, and sums up the results of all the arrays. So LongAdder is particularly advantageous when it comes to high concurrency.

Second, the already

Reentrantlock can replace synchronized. You can use ReentrantLock to perform the same functions as synchronized, but you must manually release the lock. If an exception is encountered with synchronized locking, the JVM automatically releases the lock, but ReentrantLock must release the lock manually, so it is often released in finally.

2. With reentrantLock, you can try locking, which cannot be locked or cannot be locked for a specified period of time, and the thread can decide whether to continue waiting.

Reentrantlock can be specified as a fair lock. ReentrantLock lock=new ReentrantLock(true); New a fair lock. The default is unfair lock.

1. Fair lock and unfair lock

Fair lock: if a new thread first determines whether there are waiting threads in the waiting queue for the lock, then it will enter the waiting queue and wait for the previous thread to run first. If there is no new thread, it will directly grab the lock. This lock is fair lock. First come, last come.

Unfair lock: If a new thread tries to grab the lock without checking whether there are threads waiting in the queue, this is an unfair lock. Synchronized is an unfair lock.

Whether the incoming thread checks the queue is the key to fair and unfair locking.

Reentrantlock VS synchronized

1. Reentrantlock can replace synchronized

2. Reentrantlock must be manually closed, and the JVM automatically releases the lock when synchronized completes or fails

3. Reentrantlock is implemented through CAS, while synchronized is essentially lock upgrade

4. Reentrantlock can be attempted by tryLock

5. Reentrantlock can switch between a fair lock and an unfair lock, and synchronized are both unfair locks

CountDownLatch

/ * * *@authorJava and algorithm learning: Monday */
public class TestCountDownLatch {

    public static void usingCountDownLatch(a) {
        Thread[] threads = new Thread[10];
        CountDownLatch countDownLatch = new CountDownLatch(threads.length);

        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                int count = 0;
                for (int j = 0; j < 10000; j++) {
                    count++;
                }
                System.out.println(count);
                countDownLatch.countDown();
            });
        }

        for (Thread thread : threads) {
            thread.start();
        }

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("CountDownLatch end...");
    }

    public static void main(String[] args) { usingCountDownLatch(); }}Copy the code

The initial size of countDownLatch is defined as 10, and the latch value decreases by 1 for each additional thread (countdownlatch.countdown ()), and waits (countdownlatch.await ()) until the latch value is zero.

4. CyclicBarrier

/ * * *@authorJava and algorithm learning: Monday */
public class TestCyclicBarrier {

    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(20, ()->{
            System.out.println("Full, go.");
        });

        for (int i = 0; i < 100; i++) {
            new Thread(()->{
                try {
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch(BrokenBarrierException e) { e.printStackTrace(); } }).start(); }}}Copy the code

CyclicBarrier size is defined as 20 and is executed once until 20 threads are reached, otherwise waiting (barrier.await()). That is, the above program will output 5 full, start.

Five, the Phaser

/ * * *@authorJava and algorithm learning: Monday */
public class TestPhaser {
    private static MarriagePhaser marriagePhaser = new MarriagePhaser();

    public static void sleep(a) {
        try {
            TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
        } catch(InterruptedException e) { e.printStackTrace(); }}static class MarriagePhaser extends Phaser {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            switch (phase) {
                case 0:
                    System.out.println("Everyone is here." + registeredParties);
                    System.out.println();
                    return false;
                case 1:
                    System.out.println("Everybody finish." + registeredParties);
                    System.out.println();
                    return false;
                case 2:
                    System.out.println("Everybody out." + registeredParties);
                    System.out.println();
                    return false;
                case 3:
                    System.out.println("Hugs for the bride and groom." + registeredParties);
                    return true;
                default:
                    return true; }}}static class Person implements Runnable {
        String name;

        public Person(String name) {
            this.name = name;
        }

        private void arrive(a) {
            sleep();
            System.out.println(name + "Arriving at the scene");
            marriagePhaser.arriveAndAwaitAdvance();
        }

        private void eat(a) {
            sleep();
            System.out.println(name + "Eat");
            marriagePhaser.arriveAndAwaitAdvance();
        }

        private void leave(a) {
            sleep();
            System.out.println(name + "Left");
            marriagePhaser.arriveAndAwaitAdvance();
        }

        private void hug(a) {
            if ("Groom".equals(name) || "The bride".equals(name)) {
                sleep();
                System.out.println(name + " 拥抱");
                marriagePhaser.arriveAndAwaitAdvance();
            } else{ marriagePhaser.arriveAndDeregister(); }}@Override
        public void run(a) { arrive(); eat(); leave(); hug(); }}public static void main(String[] args) {
        marriagePhaser.bulkRegister(7);

        for (int i = 0; i < 5; i++) {
            new Thread(new Person("person" + i)).start();
        }

        new Thread(new Person("Groom")).start();
        new Thread(new Person("The bride")).start(); }}Copy the code

It’s like a fence group, a phased fence. When all threads have reached a certain barrier, the next operation will be performed.

Six, ReadWriteLock

Read is a shared lock

Write with exclusive locks (mutex)

If the reader thread first holds the lock, the second reader thread can read it together. When the second writer thread comes in, it blocks, doesn’t allow you to write, and I’ll write when I’m done.

At first, if the writer thread holds the lock, no matter whether the second thread is reading or writing, it blocks, and I have to wait for the other thread to read, and the other thread can write.

/ * * *@authorJava and algorithm learning: Monday */
public class TestReadWriteLock {
    private static int value;

    private static ReentrantLock lock = new ReentrantLock();

    private static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static Lock readLock = readWriteLock.readLock();
    private static Lock writeLock = readWriteLock.writeLock();

    public static void read(Lock lock) {
        try {
            lock.lock();
            Thread.sleep(1000);
            System.out.println("read...");
        } catch (Exception e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}public static void write(Lock lock, int v) {
        try {
            lock.lock();
            value = v;
            Thread.sleep(1000);
            System.out.println("write..." + value);
        } catch (Exception e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}public static void main(String[] args) throws InterruptedException {
        // There are 8 read threads
        Thread[] tRead = new Thread[8];
        for (int i = 0; i < 8; i++) {
            tRead[i] = new Thread(() -> {
                read(readLock);
            });
        }
        long start = System.currentTimeMillis();
        for (Thread t : tRead) {
            t.start();
        }
        for (Thread t : tRead) {
            t.join();
        }

        // Write two threads
        Thread[] tWrite = new Thread[2];
        for (int i = 0; i < 2; i++) {
            tWrite[i] = new Thread(() -> {
                write(writeLock, new Random().nextInt(10));
            });
        }
        for (Thread t : tWrite) {
            t.start();
        }
        for (Thread t : tWrite) {
            t.join();
        }

        long end = System.currentTimeMillis();
        System.out.println("total time: "+ (end - start)); }}Copy the code

If Reentrantlock is used to lock, the program will run for 10s, that is, Reentrantlock reads and writes are exclusive locks.

If you use ReadWriteLock to lock, the program runs for 3s. That is, ReadWriteLock is a shared lock for reads and an exclusive lock for writes.

7. Semaphore

Semaphore is initially assigned a default value that indicates how many threads can be allowed to run simultaneously. It means limiting the current. Semaphore is an unfair lock by default. When new Semaphore(1, true) new is set to true, it indicates a fair lock.

/ * * *@authorJava and algorithm learning: Monday */
public class TestSemaphore {
    public static void main(String[] args) {
        Semaphore s = new Semaphore(1);

        new Thread(()->{
            try {
                s.acquire();
                System.out.println("t1...");
                TimeUnit.SECONDS.sleep(1);
                System.out.println("t1...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally{ s.release(); }},"t1").start();

        new Thread(()->{
            try {
                s.acquire();
                System.out.println("t2...");
                TimeUnit.SECONDS.sleep(1);
                System.out.println("t2...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally{ s.release(); }},"t2").start(); }}Copy the code

Eight, Exchanger

/ * * *@authorJava and algorithm learning: Monday */
public class TestExchanger {

    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        new Thread(() -> {
            String s = "T1";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "" + s);
        }, "t1").start();

        new Thread(() -> {
            String s = "T2";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "" + s);
        }, "t2").start(); }}Copy the code

The exchange() method blocks. One thread calls the Exchange () method and it blocks until the second thread calls the exchange() method. Then the first thread will execute down.