This is a continuation of the previous part of the “Personal Vomiting blood series – Summing up Java Multithreading (1)” article oh, this is the second part about the outline diagram in the first part.

Concurrent collection container

Why are ArrayList threads unsafe?

“Look at the source of the add method” :

public boolean add(E e) {  

  

    / * *

* When you add an element, you do the following two steps

* 1. Check whether the capacity in the list is sufficient and whether the capacity needs to be expanded

* 2. Actually put the element in the list's element array

* /
  

    ensureCapacityInternal(size + 1);  // Increments modCount!! // It is possible that this operation causes the next array to be out of bounds

    elementData[size++] = e; // May be null

    return true;  

}  

Copy the code

“Array out of bounds” :

  1. The list size is 9, that is, size=9
  2. Thread A starts the Add method, at which point it gets A size of 9 and calls the ensureCapacityInternal method for capacity determination.
  3. Thread B now goes to add, it gets size 9, and it also calls ensureCapacityInternal.
  4. Thread A finds that the requirement size is 10, and elementData size is 10 and can fit. So it doesn’t expand anymore and goes back.
  5. Thread B also finds that the requirement size is 10, can also accommodate, return.
  6. Thread A starts the set value operation, elementData[size++] = e. The size changes to 10.
  7. Thread B also starts the set value operation. It tries to set elementData[10] = e, which has not been expanded and has a maximum subscript of 9. So at this point would quote a abnormal ArrayIndexOutOfBoundsException of an array.

“Null value case” :

“ElementData [size++] = e is not an atomic operation” :

  1. elementData[size] = e;
  2. size = size + 1;

Logic:

  1. The list size is 0, i.e. size=0
  2. Thread A starts adding an element with the value A. At this point it performs the first operation, placing A at elementData subscript 0.
  3. Then thread B just happens to start adding an element of value B, and gets to the first step. At this point thread B gets the value of size that is still 0, so it also places B at elementData subscript 0.
  4. Thread A starts incrementing size to 1
  5. Thread B starts incrementing size to 2

“So when thread AB completes, ideally size is 2, elementData is 0 at A, and elementData is 0 at B, and there is nothing at 1. And it will remain null until the set method is used to change the value of this location, since size is 2 and elements will be added from the subscript 2.”

What are the concurrency solutions for unsafe sets?

ArrayList->Vector->SynchronizedList->CopyOnWriteArrayList

ArraySet->SynchronizedSet->CopyOnWriteArraySet

HashMap->SynchronizedMap->ConcurrentHashMap

Concurrent synchronous container

AQS principle

AQS uses an “int member variable to indicate synchronization status” and “a built-in FIFO queue to queue up threads to acquire resources”. AQS uses CAS to perform atomic operations on the synchronization state to modify its value.

private volatile int state;// Share variables with volatile modifier to ensure thread visibility

Copy the code

State information is operated on by protected types getState, setState, compareAndSetState

// Returns the current value of the synchronization status

protected final int getState(a) {  

        return state;  

}  

 // Set the synchronization status

protected final void setState(int newState) {  

        state = newState;  

}  

Update If the value of the current synchronization state is equal to expect (expected)

protected final boolean compareAndSetState(int expect, int update) {  

        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);  

}  

Copy the code

“AQS defines two modes of resource sharing” :

  1. “Exclusive” can be executed by only one thread, such as ReentrantLock. ReentrantLock can be classified into fair and unfair locks. ReentrantLock supports both types of locks.

    Summary: There are only two differences between fair and unfair locks:

    1. If the lock is not occupied at this time, then the lock will be directly obtained and returned.
    2. After a CAS failure, an unfair lock will enter the tryAcquire method as a “fair lock”. In the tryAcquire method, if the lock is found to be released (state == 0), the unjust lock will be directly seized by CAS. However, fair lock will determine whether there is a thread waiting in the queue, if there is, it will not grab the lock, obediently queue to the back.
  2. “Share” can be executed simultaneously by multiple threads, such as Semaphore/CountDownLatch. Semaphore, CyclicBarrier, ReadWriteLock.

AQS uses the template method pattern. To customize the synchronizer, you need to override the following template methods provided by AQS:

isHeldExclusively()// Whether the thread is monopolizing resources. You only need to implement it if you use condition.

tryAcquire(int)// Exclusive mode. Attempts to obtain the resource return true on success and false on failure.

tryRelease(int)// Exclusive mode. Attempts to free resources return true on success and false on failure.

tryAcquireShared(int)// Share mode. Attempt to obtain resources. Negative numbers indicate failure; 0 indicates success, but no available resources are available. A positive number indicates success and free resources.

tryReleaseShared(int)// Share mode. Attempts to free resources return true on success and false on failure.

Copy the code

CountDownLatch

CountDownLatch is an implementation of “shared locks” that constructs AQS with a “state value of count” by default. When a thread uses the countDown method, the tryReleaseShared method is used to “decrement the state by CAS” until “a state of 0 means that all threads have called the countDown method”. “When we call await, if state is not 0, there are still threads that have not called countDown.” Then we place all threads that have already called countDown into the blocking queue Park and spin CAS to determine state == 0. Until the last thread calls countDown so that state == 0, the blocking thread judges success and executes.

“Three uses” :

  1. A thread waits for n threads to complete before it starts running. Initialize CountDownLatch’s counter to n:new CountDownLatch(n)Each time a task thread completes, the counter is reduced by 1countdownlatch.countDown(), when the value of the counter changes to 0CountDownLatch上 await()The thread is woken up. A typical application scenario is that when starting a service, the main thread waits for multiple components to load before resuming execution.
  2. Achieve maximum parallelism when multiple threads start executing tasks. Note that parallelism, not concurrency, emphasizes that multiple threads start executing at the same time. Similar to a race, multiple threads are placed at the starting point, wait for the starting gun to go off, and then run at the same time. The approach is to initialize a sharedCountDownLatchObject that initializes its counter to 1:new CountDownLatch(1), multiple threads begin to execute tasks firstcoundownlatch.await()When the main thread calls countDown(), the counter goes to 0 and multiple threads wake up at the same time.
  3. Deadlock detection: A very convenient usage scenario is that you can use n threads to access a shared resource, with a different number of threads in each test phase, and try to generate deadlocks.
public class CountDownLatchDemo {  

  

    public static void main(String[] args) throws InterruptedException {  

        countDownLatchTest();  

// general();

    }  

  

    public static void general(a) {  

        for (int i = 0; i < 6; i++) {  

            new Thread(() -> {  

                System.out.println(Thread.currentThread().getName() + "Finish the study and leave the teacher.");  

            }, "Thread --> " + i).start();  

        }  

        while (Thread.activeCount() > 2) {  

            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }  

            System.out.println(Thread.currentThread().getName() + "==== monitor finally leaves");  

        }  

    }  

  

    public static void countDownLatchTest(a) throws InterruptedException {  

        CountDownLatch countDownLatch = new CountDownLatch(6);  

        for (int i = 0; i < 6; i++) {  

            new Thread(() -> {  

                System.out.println(Thread.currentThread().getName() + "Finish the study and leave the teacher.");  

                countDownLatch.countDown();  

            }, "Thread --> " + i).start();  

        }  

        countDownLatch.await();  

        System.out.println(Thread.currentThread().getName() + "==== monitor finally leaves");  

    }  

}  

Copy the code

CyclicBarrier

CyclicBarrier literally means CyclicBarrier. What it does is “let a group of threads block when they reach a barrier (also known as a synchronization point), and the barrier will not open until the last thread reaches the barrier, and all threads blocked by the barrier will continue to work.” The CyclicBarrier’s default constructor is CyclicBarrier(int parties), whose argument is the number of threads that the barrier intercepts, and each thread calls the await method to tell the CyclicBarrier that I have reached the barrier and the current thread is blocked.

public class CyclicBarrierDemo {  

    public static void main(String[] args) {  

        cyclicBarrierTest();  

    }  

  

    public static void cyclicBarrierTest(a) {  

        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () - > {

            System.out.println("==== Summon the Dragon ====");  

        });  

        for (int i = 0; i < 7; i++) {  

            final int tempInt = i;  

            new Thread(() -> {  

                System.out.println(Thread.currentThread().getName() + "Collect to the first" + tempInt + "Dragon Ball");  

                try {  

                    cyclicBarrier.await();  

                } catch (InterruptedException e) {  

                    e.printStackTrace();  

                } catch (BrokenBarrierException e) {  

                    e.printStackTrace();  

                }  

            }, "" + i).start();  

        }  

    }  

}  

Copy the code

When you call the CyclicBarrier object and call the await() method, you are actually calling the dowait(false, 0L) method. The await() method, like the act of erecting a barrier, blocks the thread and only opens when the number of blocked threads reaches the value of Parties so that the thread can pass execution.

 // methods after await are executed only if the number of threads or requests reaches count. The value of count in the example above is 5.

    private int count;  

    / * *

     * Main barrier code, covering the various policies.  

* /
  

    private int dowait(boolean timed, long nanos)  

        throws InterruptedException, BrokenBarrierException,  

               TimeoutException 
{  

        final ReentrantLock lock = this.lock;  

        / / lock

        lock.lock();  

        try {  

            final Generation g = generation;  

  

            if (g.broken)  

                throw new BrokenBarrierException();  

  

            // If the thread breaks, throw an exception

            if (Thread.interrupted()) {  

                breakBarrier();  

                throw new InterruptedException();  

            }  

            / / cout minus 1

            int index = --count;  

            // If count is reduced to 0, the last thread has reached the fence and is ready to execute the await method

            if (index == 0) {  // tripped

                boolean ranAction = false;  

                try {  

                    final Runnable command = barrierCommand;  

                    if(command ! =null)  

                        command.run();  

                    ranAction = true;  

                    // Reset count to the initial value of the parties property

                    // Wake up the waiting thread

                    // The next wave of execution begins

                    nextGeneration();  

                    return 0;  

                } finally {  

                    if(! ranAction)

                        breakBarrier();  

                }  

            }  

  

            // loop until tripped, broken, interrupted, or timed out

            for (;;) {  

                try {  

                    if(! timed)

                        trip.await();  

                    else if (nanos > 0L)  

                        nanos = trip.awaitNanos(nanos);  

                } catch (InterruptedException ie) {  

                    if (g == generation && ! g.broken) {  

                        breakBarrier();  

                        throw ie;  

                    } else {  

                        // We're about to finish waiting even if we had not

                        // been interrupted, so this interrupt is deemed to

                        // "belong" to subsequent execution.

                        Thread.currentThread().interrupt();  

                    }  

                }  

  

                if (g.broken)  

                    throw new BrokenBarrierException();  

  

                if(g ! = generation)

                    return index;  

  

                if (timed && nanos <= 0L) {  

                    breakBarrier();  

                    throw new TimeoutException();  

                }  

            }  

        } finally {  

            lock.unlock();  

        }  

    }

Copy the code

Summary: CyclicBarrier uses a count variable as its internal counter, which is initialized by the Parties property and decrement by one each time a thread reaches the barrier. If count is zero, it means that this is the last thread of this generation to reach the fence, and we try to execute the task we entered in our constructor.

Semaphore

Synchronized and ReentrantLock allow only one thread to access a resource at a time, and Semaphore allows multiple threads to access a resource simultaneously.

public class SemaphoreDemo {  

    public static void main(String[] args) {  

        Semaphore semaphore = new Semaphore(3);// Simulate three parking Spaces

        for (int i = 0; i < 6; i++) { // Simulate 6 cars

            new Thread(() -> {  

                try {  

                    semaphore.acquire();  

                    System.out.println(Thread.currentThread().getName() + "Grab a parking space.");  

                    / / stop 3 s

                    try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }  

                    System.out.println(Thread.currentThread().getName() + "Leave the parking space for three seconds.");  

                } catch (Exception e) {  

                    e.printStackTrace();  

                } finally {  

                    semaphore.release();  

                }  

            }, "Car " + i).start();  

        }  

    }  

}  

Copy the code

Blocking queue

  • ArrayBlockingQueue: a bounded blocking queue composed of array structures.
  • LinkedBlockingQueue: a bounded (but default size Integer>MAX_VALUE) blocking queue consisting of a linked list structure.
  • PriorityBlockingQueue: An unbounded blocking queue that supports priority sorting.
  • DelayQueue: delay unbounded blocking queue implemented using priority queues.
  • SynchronousQueue: A blocking queue that does not store elements, that is, a queue for individual elements.
  • LinkedTransferQueue: An unbounded blocking queue consisting of a linked list structure.
  • LinkedBlockingDuque: A two-way blocking queue consisting of a linked list structure.
  • Method to throw an exception: Add remove
  • Do not throw exceptions: offer poll
  • Blocking the put take
  • Offer poll with time

Producer consumer

Producers and consumers of synchronized versions are cumbersome

public class ProdConsumerSynchronized {  

  

    private final LinkedList<String> lists = new LinkedList<>();  

  

    public synchronized void put(String s) {  

        while(lists.size() ! =0) { // Use while to wake up the thread

            // It is full

            try {  

                this.wait();  

            } catch (InterruptedException e) {  

                e.printStackTrace();  

            }  

        }  

        lists.add(s);  

        System.out.println(Thread.currentThread().getName() + "" + lists.peekFirst());  

        this.notifyAll(); // All suspended threads are notified, including other producer threads

    }  

  

    public synchronized void get(a) {  

        while (lists.size() == 0) {  

            try {  

                this.wait();  

            } catch (InterruptedException e) {  

                e.printStackTrace();  

            }  

        }  

        System.out.println(Thread.currentThread().getName() + "" + lists.removeFirst());  

        this.notifyAll(); // Notify all threads suspended by WAIT with notify that they may be deadlocked.

    }  

  

    public static void main(String[] args) {  

        ProdConsumerSynchronized prodConsumerSynchronized = new ProdConsumerSynchronized();  

  

        // Start the consumer thread

        for (int i = 0; i < 5; i++) {  

            new Thread(prodConsumerSynchronized::get, "ConsA" + i).start();  

        }  

  

        // Start the producer thread

        for (int i = 0; i < 5; i++) {  

            int tempI = i;  

            new Thread(() -> {  

                prodConsumerSynchronized.put("" + tempI);  

            }, "ProdA" + i).start();  

        }  

    }  

}  

Copy the code

ReentrantLock

public class ProdConsumerReentrantLock {  

  

    private LinkedList<String> lists = new LinkedList<>();  

  

    private Lock lock = new ReentrantLock();  

  

    private Condition prod = lock.newCondition();  

  

    private Condition cons = lock.newCondition();  

  

    public void put(String s) {  

        lock.lock();  

        try {  

            / / 1. The judgment

            while(lists.size() ! =0) {  

                // Waiting cannot produce

                prod.await();  

            }  

            / / 2. The work

            lists.add(s);  

            System.out.println(Thread.currentThread().getName() + "" + lists.peekFirst());  

            / / 3. The notice

            cons.signalAll();  

        } catch (Exception e) {  

            e.printStackTrace();  

        } finally {  

            lock.unlock();  

        }  

    }  

  

    public void get(a) {  

        lock.lock();  

        try {  

            / / 1. The judgment

            while (lists.size() == 0) {  

                // Waiting cannot consume

                cons.await();  

            }  

            / / 2. The work

            System.out.println(Thread.currentThread().getName() + "" + lists.removeFirst());  

            / / 3. The notice

            prod.signalAll();  

        } catch (Exception e) {  

            e.printStackTrace();  

        } finally {  

            lock.unlock();  

        }  

    }  

  

    public static void main(String[] args) {  

        ProdConsumerReentrantLock prodConsumerReentrantLock = new ProdConsumerReentrantLock();  

        for (int i = 0; i < 5; i++) {  

            int tempI = i;  

            new Thread(() -> {  

                prodConsumerReentrantLock.put(tempI + "");  

            }, "ProdA" + i).start();  

        }  

        for (int i = 0; i < 5; i++) {  

            new Thread(prodConsumerReentrantLock::get, "ConsA" + i).start();  

        }  

    }  

}  

Copy the code

BlockingQueue

public class ProdConsumerBlockingQueue {  

  

    private volatile boolean flag = true;  

  

    private AtomicInteger atomicInteger = new AtomicInteger();  

  

    BlockingQueue<String> blockingQueue = null;  

  

    public ProdConsumerBlockingQueue(BlockingQueue<String> blockingQueue) {  

        this.blockingQueue = blockingQueue;  

    }  

  

    public void myProd(a) throws Exception {  

        String data = null;  

        boolean retValue;  

        while (flag) {  

            data = atomicInteger.incrementAndGet() + "";  

            retValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS);  

            if (retValue) {  

                System.out.println(Thread.currentThread().getName() + "Insert queue" + data + "Success");  

            } else {  

                System.out.println(Thread.currentThread().getName() + "Insert queue" + data + "Failure");  

            }  

            TimeUnit.SECONDS.sleep(1);  

        }  

        System.out.println(Thread.currentThread().getName() + "Big boss has called a halt. Flag =false. End of production.");  

    }  

  

    public void myConsumer(a) throws Exception {  

        String result = null;  

        while (flag) {  

            result = blockingQueue.poll(2, TimeUnit.SECONDS);  

            if (null == result || result.equalsIgnoreCase("")) {  

                flag = false;  

                System.out.println(Thread.currentThread().getName() + "If you go beyond 2s and you don't get the cake, the consumption quits.");  

                return;  

            }  

            System.out.println(Thread.currentThread().getName() + "Consumption queue" + result + "Success");  

        }  

    }  

  

    public void stop(a) {  

        flag = false;  

    }  

  

    public static void main(String[] args) {  

        ProdConsumerBlockingQueue prodConsumerBlockingQueue = new ProdConsumerBlockingQueue(new ArrayBlockingQueue<>(10));  

        new Thread(() -> {  

            System.out.println(Thread.currentThread().getName() + "Production thread started");  

            try {  

                prodConsumerBlockingQueue.myProd();  

            } catch (Exception e) {  

                e.printStackTrace();  

            }  

        }, "Prod").start();  

  

        new Thread(() -> {  

            System.out.println(Thread.currentThread().getName() + "Consumer thread start");  

            try {  

                prodConsumerBlockingQueue.myConsumer();  

            } catch (Exception e) {  

                e.printStackTrace();  

            }  

        }, "Consumer").start();  

  

        try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }  

        System.out.println("5s after main stops, thread ends");  

        prodConsumerBlockingQueue.stop();  

    }  

}  

Copy the code

The thread pool

Benefits of thread pools

  • “Reduce resource consumption.” Reduce the cost of thread creation and destruction by reusing created threads.
  • “Improve response speed”. When a task arrives, it can be executed immediately without waiting for the thread to be created.
  • Improve thread manageability. Threads are scarce resources. If they are created without limit, they will not only consume system resources, but also reduce system stability. Thread pools can be used for unified allocation, tuning, and monitoring.

FixedThreadPool

A FixedThreadPool is called a reusable thread pool with a fixed number of threads. Take a look at the implementation of the relevant source code in Executors class:

 / * *

* Create a thread pool that can reuse a fixed number of threads

* /
  

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {  

        return new ThreadPoolExecutor(nThreads, nThreads,  

                                      0L, TimeUnit.MILLISECONDS,  

                                      new LinkedBlockingQueue<Runnable>(),  

                                      threadFactory);  

    }

Copy the code

CorePoolSize and maximumPoolSize of the newly created FixedThreadPool are both set to nThreads, which we pass ourselves when we use it.

  1. If the number of threads currently running is less than corePoolSize, a new thread is created to execute the task if a new task comes in.
  2. If the number of threads currently running is equal to corePoolSize, a new task will be added to itLinkedBlockingQueue;
  3. Threads in the thread pool, after executing the task at hand, iterate through the loop fromLinkedBlockingQueueTo obtain tasks to execute;

“Not recommended” : Using the unbounded queue LinkedBlockingQueue (queue size intger.max_value) as the work queue of a FixedThreadPool has the following effects on the thread pool:

  1. When the number of threads in the thread pool reachescorePoolSizeAfter, the new task will wait in an unbounded queue, so the number of threads in the thread pool will not exceed corePoolSize;
  2. Because when you use an unbounded queuemaximumPoolSizeWill be an invalid parameter because there cannot be a full task queue. So, by creatingFixedThreadPoolThe source code can be seen createdFixedThreadPoolcorePoolSizemaximumPoolSizeIs set to the same value.
  3. Because of 1 and 2, when you use an unbounded queuekeepAliveTimeWill be an invalid argument;
  4. The runningFixedThreadPool(failed.shutdown()orshutdownNow()– will not reject tasks, causing OOM (memory overflow) when there are too many tasks.

SingleThreadExecutor

 / * *

* Returns a thread pool with only one thread

* /
  

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {  

        return new FinalizableDelegatedExecutorService  

            (new ThreadPoolExecutor(1.1.

                                    0L, TimeUnit.MILLISECONDS,  

                                    new LinkedBlockingQueue<Runnable>(),  

                                    threadFactory));  

    }

Copy the code

Same as above, except core and Max are set to 1

CachedThreadPool

 / * *

* Create a thread pool, create new threads as needed, but reuse previously built threads as they become available.

* /
  

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {  

        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  

                                      60L, TimeUnit.SECONDS,  

                                      new SynchronousQueue<Runnable>(),  

                                      threadFactory);  

    }

Copy the code

CorePoolSize of CachedThreadPool is set to null (0) and maximumPoolSize is set to integer.max. VALUE, which is unbounded, This means that CachedThreadPool is constantly creating new threads if the main thread is submitting tasks faster than the threads in maximumPool can process them. In extreme cases, this can exhaust CPU and memory resources.

  1. Executed firstSynchronousQueue.offer(Runnable task)Submit the task to the task queue. If the currentmaximumPoolAn idle thread is executing inSynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS), so the main thread performs the offer operation versus the idle threadpollThe operation is paired and the main thread passes the task to the idle thread.execute()Method completes, otherwise perform Step 2 below;
  2. When the initialmaximumPoolIs empty, ormaximumPoolWhen there are no idle threads in, there are no threads to executeSynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS). In this case, step 1 will fail at this pointCachedThreadPoolA new thread is created to execute the task, and the execute method completes.

ScheduledThreadPoolExecutor omitted, basic wouldn’t use

ThreadPoolExecutor (emphasis)

 / * *

* Create a new ThreadPoolExecutor with the given initial parameters.

* /
  

    public ThreadPoolExecutor(intCorePoolSize,// The number of core threads in the thread pool

                              intMaximumPoolSize,// The maximum number of threads in the thread pool

                              longKeepAliveTime,// The maximum length of time an extra idle thread can live when the number of threads is greater than the number of core threads

TimeUnit unit,// TimeUnit

BlockingQueue<Runnable> workQueue,// A queue to store tasks waiting to be executed

ThreadFactory ThreadFactory,// ThreadFactory, is used to create threads, usually by default

RejectedExecutionHandler Handler // Reject policy. If too many submitted tasks cannot be processed in a timely manner, you can customize the policy to process the tasks

                               )
 
{  

        if (corePoolSize < 0 ||  

            maximumPoolSize <= 0 ||  

            maximumPoolSize < corePoolSize ||  

            keepAliveTime < 0)  

            throw new IllegalArgumentException();  

        if (workQueue == null || threadFactory == null || handler == null)  

            throw new NullPointerException();  

        this.corePoolSize = corePoolSize;  

        this.maximumPoolSize = maximumPoolSize;  

        this.workQueue = workQueue;  

        this.keepAliveTime = unit.toNanos(keepAliveTime);  

        this.threadFactory = threadFactory;  

        this.handler = handler;  

    }

Copy the code

The 3 most important parameters of ThreadPoolExecutor:

  • corePoolSize: Number of core threads The number of threads defines the minimum number of threads that can run simultaneously.
  • maximumPoolSize: When the number of tasks in a queue reaches the queue capacity, the number of threads that can run simultaneously becomes the maximum number of threads.
  • workQueueWhen a new task arrives, it checks whether the number of threads currently running reaches the core number. If so, trust is placed in the queue.

ThreadPoolExecutor other common parameters:

  • keepAliveTime: When the number of threads in the thread pool is greater thancorePoolSizeIf no new tasks are submitted at this point, threads outside the core thread will not be destroyed immediately, but will wait until the waiting time is exceededkeepAliveTimeBefore they are recycled and destroyed;
  • unit: keepAliveTimeThe time unit of a parameter.
  • threadFactory** : used when executor creates a new thread.
  • handler: Saturation strategy. The saturation strategy is described separately below.
The relationship between thread pool parameters

“Thread pool parameters relationship” :

If the maximum number of threads currently running at the same time is reached and the queue is already full, ThreadPoolTaskExecutor defines some policies:

  • ThreadPoolExecutor.AbortPolicy: throwRejectedExecutionExceptionTo reject new tasks.
  • ThreadPoolExecutor.CallerRunsPolicy: calls the thread that executes its own running task. You do not task requests. However, this strategy will reduce the speed of submitting new tasks and affect the overall performance of the program. In addition, this policy likes to increase the queue capacity. You can choose this strategy if your application can withstand this delay and you cannot task drop a single task request.
  • ThreadPoolExecutor.DiscardPolicy: Discard new tasks without processing them.
  • ThreadPoolExecutor.DiscardOldestPolicy: This policy discards the earliest unprocessed task request.

When Spring creates thread pools through ThreadPoolTaskExecutor or we create thread pools directly through the constructor of ThreadPoolExecutor, When we don’t specify RejectedExecutionHandler saturated policy to configure the thread pool is used by default when ThreadPoolExecutor. AbortPolicy. By default, the ThreadPoolExecutor throws RejectedExecutionException to reject the new task, it will lose to the task of processing on your behalf. For scalable applications, it is recommended to use ThreadPoolExecutor. CallerRunsPolicy. This strategy provides us with scalable queues when the maximum pool is filled. (Look directly at the source code for ThreadPoolExecutor’s constructor, but don’t post the code here for simpler reasons.)

Executors return the thread pool object to the following disadvantages:

  • FixedThreadPoolSingleThreadExecutorThe queue length of allowed requests is integer. MAX_VALUE, which may accumulate a large number of requests and result in OOM.
  • CachedThreadPoolScheduledThreadPool: The number of threads allowed to be created is integer. MAX_VALUE. A large number of threads may be created, resulting in OOM.
public class ThreadPoolExecutorDemo {  

    public static void main(String[] args) {  

        ExecutorService threadpools = new ThreadPoolExecutor(  

                3.

                5.

                1l.

                TimeUnit.SECONDS,  

                new LinkedBlockingDeque<>(3),  

                Executors.defaultThreadFactory(),  

                new ThreadPoolExecutor.AbortPolicy());  

//new ThreadPoolExecutor.AbortPolicy();

//new ThreadPoolExecutor.CallerRunsPolicy();

//new ThreadPoolExecutor.DiscardOldestPolicy();

//new ThreadPoolExecutor.DiscardPolicy();

        try {  

            for (int i = 0; i < 8; i++) {  

                threadpools.execute(() -> {  

                    System.out.println(Thread.currentThread().getName() + "\t Transact business");  

                });  

            }  

        } catch (Exception e) {  

            e.printStackTrace();  

        } finally {  

            threadpools.shutdown();  

        }  

    }  

}  

Copy the code

Java locking mechanism

Fair lock/unfair lock

“A fair lock means that multiple threads acquire the lock in the order in which they apply for it. An unfair lock means that multiple threads acquire the lock in a different order from the order in which they apply for the lock. Possibly, it could cause priority reversal or starvation (not getting a lock for a long time – Africans…), ReentrantLock, check it out.

Reentrant lock

Reentrant lock, also known as recursive lock, is a method that automatically acquires the lock when the same thread acquires the lock in the outer method

synchronized void setA(a) throws Exception {  

   Thread.sleep(1000);  

   setB(); SetB () will automatically acquire the lock of setB(). If it does not, method B will not execute

}  

synchronized void setB(a) throws Exception {  

   Thread.sleep(1000);  

}  

Copy the code

Exclusive lock/shared lock

  • Exclusive lock: The lock can only be held by one thread at a time.
  • Shared lock: The lock can be held by multiple threads.

Mutex/read-write lock

The exclusive lock/shared lock mentioned above is a broad term, and mutex/read-write lock is its specific implementation

Optimistic lock/pessimistic lock

  1. Optimistic lock and pessimistic lock do not refer to any specific type of lock, but to view the art of war synchronization Angle.
  2. “Pessimistic lock thinks that for the concurrent operation of the same data, it must be modified, even if there is no modification, it will be considered modified. Therefore, for the concurrent operation of the same data, pessimistic lock adopts the form of locking. Pessimistic lock thinks that the concurrent operation without locking will definitely have problems.”
  3. “Optimistic locking means that for concurrent operations on the same data, there will be no modification. When updating data, the data will be updated by trying to update, and constantly re. Optimistic locking means that there will be nothing in concurrent operations without locking.”
  4. Pessimistic locks are suitable for scenarios with a lot of write operations and optimistic locks are suitable for scenarios with a lot of read operations.
  5. “Pessimistic locks in the use of Java, is the use of various locks. Optimistic locking in the use of Java, is unlocked programming, often using a CAS algorithm, a typical example is the atomic classes, atomic classes are implemented by CAS spin operation updates. Heavyweight lock is a kind of pessimistic locking, spin locks, lightweight and biased locking is optimistic locking”.

Segmented lock

  1. Segmented locking is a lock design, not a specific lock. For ConcurrentHashMap, its concurrent implementation is a segmented lock for efficient concurrent operations.
  2. ConcurrentHashMap Segment lock is called Segment, which is similar to the structure of HashMap (JDK7 and JDK8 implementation of HashMap), that is, it has an array of entries. Each element in the array is a linked list; ReentrantLock (Segment inherits ReentrantLock)
  3. “When you need to put an element, instead of locking the entire HashMap, you use HashCode to know which segment it’s going to put in, and then lock that segment. So when you multithread put, as long as it’s not in one segment, you’re really doing parallel inserts. However, in order to obtain the global information of the HashMap, we need to obtain all the segment locks.
  4. Segmented locking is designed to refine the granularity of the lock so that only one item in the array is locked when the operation does not need to update the entire array.

Bias lock/lightweight lock/heavyweight lock

  1. “These three locks are states of locks and are specific to Synchronized. Java5 implements efficient Synchronized by introducing a lock upgrade mechanism. The states of these three locks are indicated by fields in the object header of the object monitor. Biased locking is when a piece of synchronized code is always accessed by a thread, the thread will automatically acquire the lock, reducing the cost of acquiring the lock.
  2. “Biased locking applies to: Always only one thread in execution code block, after it did not perform before releasing the lock, no other threads to execute the synchronous fast, use in a lock without competition, once the competition will upgrade for lightweight locks, upgrade for lightweight locked when you need to reverse bias, revocation of biased locking time will lead to stop the word operation; In lock contention, biased locks will do a lot of extra operations, especially when canceling biased locks, which will lead to the safety point, which will lead to STW, resulting in performance degradation, and should be disabled.”
  3. “Lightweight lock refers to that when the lock is biased and accessed by another thread lock, the biased lock will be upgraded to lightweight lock. Other threads will try to acquire the lock in the form of self-selection, which will not block and improve performance”.
  4. “Heavyweight lock means when locked for lightweight, although another thread is spin, but spin will not last forever, when the spin of a certain number of times, haven’t get the lock, will enter the blocked, the locked into a heavyweight. Heavyweight can let other application thread into block lock, performance”.

spinlocks

  1. In Java, a spin lock means that the thread attempting to acquire the lock does not block immediately, but instead attempts to acquire the lock in a loop. This has the advantage of reducing the cost of thread context switching, but the disadvantage is that the loop consumes CPU.
  2. “Spin lock principle is very simple, if the thread holding the lock can lock is released in a very short time resources, and the thread lock wait for competition there is no need to do between kernel mode and user mode switch into the block pending state, they just need to wait for a while (spin), such as thread holding the lock immediately after releasing the lock locks, This avoids the elimination of user thread and kernel switching.
  3. Spinlocks reduce thread blocking as much as possible, and for code that does not compete for locks and has a very short lock time, performance can be significantly improved, because the spin cost is less than the cost of a thread blocking, suspending and waking up operations.
  4. “But if the competition is intense, lock or thread holding the lock need long time to occupy the lock synchronization, at that time did not apply using a spin lock, because of the spin lock before acquiring a lock is CPU doing this all the time, at the same time a large number of threads a lock, the competition will result in acquiring a lock for a long time, spin thread consumption is greater than the thread block suspend operation consumption, Other threads that need CPU can’t get it, wasting it.”

Java lock summary

Java locking mechanism can be classified into Sychornized locking and Lock locking. Synchronized ensures data synchronization based on JVM, while Lock relies on special CPU instructions for data synchronization at the hardware level.

  • Synchronized is an unfair, pessimistic, exclusive, mutually exclusive, reentrant heavyweight lock.
  • ReentrantLock is a pessimistic, exclusive, mutually exclusive, reentrant, heavyweight lock that is not fair by default but is fair by implementation.
  • ReentrantReadWriteLock is a pessimistic, write exclusive, read share, read/write, reentrable, heavyweight lock that defaults to unfair but implements fair.

To clarify, I have split this article into two parts because digging limits the length or word count. This is the second part; Follow me for part one

Creation is not easy, if you think it will help, give a small star. Making address 😁 😁 😁