❝
This is a continuation of the previous part of the “Personal Vomiting blood series – Summing up Java Multithreading (1)” article oh, this is the second part about the outline diagram in the first part.
❞
Concurrent collection container
Why are ArrayList threads unsafe?
“Look at the source of the add method” :
public boolean add(E e) {
/ * *
* When you add an element, you do the following two steps
* 1. Check whether the capacity in the list is sufficient and whether the capacity needs to be expanded
* 2. Actually put the element in the list's element array
* /
ensureCapacityInternal(size + 1); // Increments modCount!! // It is possible that this operation causes the next array to be out of bounds
elementData[size++] = e; // May be null
return true;
}
Copy the code
“Array out of bounds” :
- The list size is 9, that is, size=9
- Thread A starts the Add method, at which point it gets A size of 9 and calls the ensureCapacityInternal method for capacity determination.
- Thread B now goes to add, it gets size 9, and it also calls ensureCapacityInternal.
- Thread A finds that the requirement size is 10, and elementData size is 10 and can fit. So it doesn’t expand anymore and goes back.
- Thread B also finds that the requirement size is 10, can also accommodate, return.
- Thread A starts the set value operation, elementData[size++] = e. The size changes to 10.
- Thread B also starts the set value operation. It tries to set elementData[10] = e, which has not been expanded and has a maximum subscript of 9. So at this point would quote a abnormal ArrayIndexOutOfBoundsException of an array.
“Null value case” :
“ElementData [size++] = e is not an atomic operation” :
elementData[size] = e
;size = size + 1
;
Logic:
- The list size is 0, i.e. size=0
- Thread A starts adding an element with the value A. At this point it performs the first operation, placing A at elementData subscript 0.
- Then thread B just happens to start adding an element of value B, and gets to the first step. At this point thread B gets the value of size that is still 0, so it also places B at elementData subscript 0.
- Thread A starts incrementing size to 1
- Thread B starts incrementing size to 2
““So when thread AB completes, ideally size is 2, elementData is 0 at A, and elementData is 0 at B, and there is nothing at 1. And it will remain null until the set method is used to change the value of this location, since size is 2 and elements will be added from the subscript 2.”“
What are the concurrency solutions for unsafe sets?
ArrayList->Vector->SynchronizedList->CopyOnWriteArrayList
ArraySet->SynchronizedSet->CopyOnWriteArraySet
HashMap->SynchronizedMap->ConcurrentHashMap
Concurrent synchronous container
AQS principle
AQS uses an “int member variable to indicate synchronization status” and “a built-in FIFO queue to queue up threads to acquire resources”. AQS uses CAS to perform atomic operations on the synchronization state to modify its value.
private volatile int state;// Share variables with volatile modifier to ensure thread visibility
Copy the code
State information is operated on by protected types getState, setState, compareAndSetState
// Returns the current value of the synchronization status
protected final int getState(a) {
return state;
}
// Set the synchronization status
protected final void setState(int newState) {
state = newState;
}
Update If the value of the current synchronization state is equal to expect (expected)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Copy the code
“AQS defines two modes of resource sharing” :
-
“Exclusive” can be executed by only one thread, such as ReentrantLock. ReentrantLock can be classified into fair and unfair locks. ReentrantLock supports both types of locks.
Summary: There are only two differences between fair and unfair locks:
- If the lock is not occupied at this time, then the lock will be directly obtained and returned.
- After a CAS failure, an unfair lock will enter the tryAcquire method as a “fair lock”. In the tryAcquire method, if the lock is found to be released (state == 0), the unjust lock will be directly seized by CAS. However, fair lock will determine whether there is a thread waiting in the queue, if there is, it will not grab the lock, obediently queue to the back.
-
“Share” can be executed simultaneously by multiple threads, such as Semaphore/CountDownLatch. Semaphore, CyclicBarrier, ReadWriteLock.
AQS uses the template method pattern. To customize the synchronizer, you need to override the following template methods provided by AQS:
isHeldExclusively()// Whether the thread is monopolizing resources. You only need to implement it if you use condition.
tryAcquire(int)// Exclusive mode. Attempts to obtain the resource return true on success and false on failure.
tryRelease(int)// Exclusive mode. Attempts to free resources return true on success and false on failure.
tryAcquireShared(int)// Share mode. Attempt to obtain resources. Negative numbers indicate failure; 0 indicates success, but no available resources are available. A positive number indicates success and free resources.
tryReleaseShared(int)// Share mode. Attempts to free resources return true on success and false on failure.
Copy the code
CountDownLatch
CountDownLatch is an implementation of “shared locks” that constructs AQS with a “state value of count” by default. When a thread uses the countDown method, the tryReleaseShared method is used to “decrement the state by CAS” until “a state of 0 means that all threads have called the countDown method”. “When we call await, if state is not 0, there are still threads that have not called countDown.” Then we place all threads that have already called countDown into the blocking queue Park and spin CAS to determine state == 0. Until the last thread calls countDown so that state == 0, the blocking thread judges success and executes.
“Three uses” :
- A thread waits for n threads to complete before it starts running. Initialize CountDownLatch’s counter to n:
new CountDownLatch(n)
Each time a task thread completes, the counter is reduced by 1countdownlatch.countDown()
, when the value of the counter changes to 0CountDownLatch上 await()
The thread is woken up. A typical application scenario is that when starting a service, the main thread waits for multiple components to load before resuming execution. - Achieve maximum parallelism when multiple threads start executing tasks. Note that parallelism, not concurrency, emphasizes that multiple threads start executing at the same time. Similar to a race, multiple threads are placed at the starting point, wait for the starting gun to go off, and then run at the same time. The approach is to initialize a shared
CountDownLatch
Object that initializes its counter to 1:new CountDownLatch(1)
, multiple threads begin to execute tasks firstcoundownlatch.await()
When the main thread calls countDown(), the counter goes to 0 and multiple threads wake up at the same time. - Deadlock detection: A very convenient usage scenario is that you can use n threads to access a shared resource, with a different number of threads in each test phase, and try to generate deadlocks.
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
countDownLatchTest();
// general();
}
public static void general(a) {
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "Finish the study and leave the teacher.");
}, "Thread --> " + i).start();
}
while (Thread.activeCount() > 2) {
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName() + "==== monitor finally leaves");
}
}
public static void countDownLatchTest(a) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "Finish the study and leave the teacher.");
countDownLatch.countDown();
}, "Thread --> " + i).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "==== monitor finally leaves");
}
}
Copy the code
CyclicBarrier
CyclicBarrier literally means CyclicBarrier. What it does is “let a group of threads block when they reach a barrier (also known as a synchronization point), and the barrier will not open until the last thread reaches the barrier, and all threads blocked by the barrier will continue to work.” The CyclicBarrier’s default constructor is CyclicBarrier(int parties), whose argument is the number of threads that the barrier intercepts, and each thread calls the await method to tell the CyclicBarrier that I have reached the barrier and the current thread is blocked.
public class CyclicBarrierDemo {
public static void main(String[] args) {
cyclicBarrierTest();
}
public static void cyclicBarrierTest(a) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () - > {
System.out.println("==== Summon the Dragon ====");
});
for (int i = 0; i < 7; i++) {
final int tempInt = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "Collect to the first" + tempInt + "Dragon Ball");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, "" + i).start();
}
}
}
Copy the code
When you call the CyclicBarrier object and call the await() method, you are actually calling the dowait(false, 0L) method. The await() method, like the act of erecting a barrier, blocks the thread and only opens when the number of blocked threads reaches the value of Parties so that the thread can pass execution.
// methods after await are executed only if the number of threads or requests reaches count. The value of count in the example above is 5.
private int count;
/ * *
* Main barrier code, covering the various policies.
* /
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
/ / lock
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
// If the thread breaks, throw an exception
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
/ / cout minus 1
int index = --count;
// If count is reduced to 0, the last thread has reached the fence and is ready to execute the await method
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if(command ! =null)
command.run();
ranAction = true;
// Reset count to the initial value of the parties property
// Wake up the waiting thread
// The next wave of execution begins
nextGeneration();
return 0;
} finally {
if(! ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if(! timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if(g ! = generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
Copy the code
Summary: CyclicBarrier uses a count variable as its internal counter, which is initialized by the Parties property and decrement by one each time a thread reaches the barrier. If count is zero, it means that this is the last thread of this generation to reach the fence, and we try to execute the task we entered in our constructor.
Semaphore
Synchronized and ReentrantLock allow only one thread to access a resource at a time, and Semaphore allows multiple threads to access a resource simultaneously.
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);// Simulate three parking Spaces
for (int i = 0; i < 6; i++) { // Simulate 6 cars
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "Grab a parking space.");
/ / stop 3 s
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName() + "Leave the parking space for three seconds.");
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}, "Car " + i).start();
}
}
}
Copy the code
Blocking queue
- ArrayBlockingQueue: a bounded blocking queue composed of array structures.
- LinkedBlockingQueue: a bounded (but default size Integer>MAX_VALUE) blocking queue consisting of a linked list structure.
- PriorityBlockingQueue: An unbounded blocking queue that supports priority sorting.
- DelayQueue: delay unbounded blocking queue implemented using priority queues.
- SynchronousQueue: A blocking queue that does not store elements, that is, a queue for individual elements.
- LinkedTransferQueue: An unbounded blocking queue consisting of a linked list structure.
- LinkedBlockingDuque: A two-way blocking queue consisting of a linked list structure.
- Method to throw an exception: Add remove
- Do not throw exceptions: offer poll
- Blocking the put take
- Offer poll with time
Producer consumer
Producers and consumers of synchronized versions are cumbersome
public class ProdConsumerSynchronized {
private final LinkedList<String> lists = new LinkedList<>();
public synchronized void put(String s) {
while(lists.size() ! =0) { // Use while to wake up the thread
// It is full
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lists.add(s);
System.out.println(Thread.currentThread().getName() + "" + lists.peekFirst());
this.notifyAll(); // All suspended threads are notified, including other producer threads
}
public synchronized void get(a) {
while (lists.size() == 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "" + lists.removeFirst());
this.notifyAll(); // Notify all threads suspended by WAIT with notify that they may be deadlocked.
}
public static void main(String[] args) {
ProdConsumerSynchronized prodConsumerSynchronized = new ProdConsumerSynchronized();
// Start the consumer thread
for (int i = 0; i < 5; i++) {
new Thread(prodConsumerSynchronized::get, "ConsA" + i).start();
}
// Start the producer thread
for (int i = 0; i < 5; i++) {
int tempI = i;
new Thread(() -> {
prodConsumerSynchronized.put("" + tempI);
}, "ProdA" + i).start();
}
}
}
Copy the code
ReentrantLock
public class ProdConsumerReentrantLock {
private LinkedList<String> lists = new LinkedList<>();
private Lock lock = new ReentrantLock();
private Condition prod = lock.newCondition();
private Condition cons = lock.newCondition();
public void put(String s) {
lock.lock();
try {
/ / 1. The judgment
while(lists.size() ! =0) {
// Waiting cannot produce
prod.await();
}
/ / 2. The work
lists.add(s);
System.out.println(Thread.currentThread().getName() + "" + lists.peekFirst());
/ / 3. The notice
cons.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void get(a) {
lock.lock();
try {
/ / 1. The judgment
while (lists.size() == 0) {
// Waiting cannot consume
cons.await();
}
/ / 2. The work
System.out.println(Thread.currentThread().getName() + "" + lists.removeFirst());
/ / 3. The notice
prod.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ProdConsumerReentrantLock prodConsumerReentrantLock = new ProdConsumerReentrantLock();
for (int i = 0; i < 5; i++) {
int tempI = i;
new Thread(() -> {
prodConsumerReentrantLock.put(tempI + "");
}, "ProdA" + i).start();
}
for (int i = 0; i < 5; i++) {
new Thread(prodConsumerReentrantLock::get, "ConsA" + i).start();
}
}
}
Copy the code
BlockingQueue
public class ProdConsumerBlockingQueue {
private volatile boolean flag = true;
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public ProdConsumerBlockingQueue(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void myProd(a) throws Exception {
String data = null;
boolean retValue;
while (flag) {
data = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + "Insert queue" + data + "Success");
} else {
System.out.println(Thread.currentThread().getName() + "Insert queue" + data + "Failure");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "Big boss has called a halt. Flag =false. End of production.");
}
public void myConsumer(a) throws Exception {
String result = null;
while (flag) {
result = blockingQueue.poll(2, TimeUnit.SECONDS);
if (null == result || result.equalsIgnoreCase("")) {
flag = false;
System.out.println(Thread.currentThread().getName() + "If you go beyond 2s and you don't get the cake, the consumption quits.");
return;
}
System.out.println(Thread.currentThread().getName() + "Consumption queue" + result + "Success");
}
}
public void stop(a) {
flag = false;
}
public static void main(String[] args) {
ProdConsumerBlockingQueue prodConsumerBlockingQueue = new ProdConsumerBlockingQueue(new ArrayBlockingQueue<>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "Production thread started");
try {
prodConsumerBlockingQueue.myProd();
} catch (Exception e) {
e.printStackTrace();
}
}, "Prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "Consumer thread start");
try {
prodConsumerBlockingQueue.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "Consumer").start();
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("5s after main stops, thread ends");
prodConsumerBlockingQueue.stop();
}
}
Copy the code
The thread pool
Benefits of thread pools
- “Reduce resource consumption.” Reduce the cost of thread creation and destruction by reusing created threads.
- “Improve response speed”. When a task arrives, it can be executed immediately without waiting for the thread to be created.
- Improve thread manageability. Threads are scarce resources. If they are created without limit, they will not only consume system resources, but also reduce system stability. Thread pools can be used for unified allocation, tuning, and monitoring.
FixedThreadPool
A FixedThreadPool is called a reusable thread pool with a fixed number of threads. Take a look at the implementation of the relevant source code in Executors class:
/ * *
* Create a thread pool that can reuse a fixed number of threads
* /
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
Copy the code
CorePoolSize and maximumPoolSize of the newly created FixedThreadPool are both set to nThreads, which we pass ourselves when we use it.
- If the number of threads currently running is less than corePoolSize, a new thread is created to execute the task if a new task comes in.
- If the number of threads currently running is equal to corePoolSize, a new task will be added to it
LinkedBlockingQueue
; - Threads in the thread pool, after executing the task at hand, iterate through the loop from
LinkedBlockingQueue
To obtain tasks to execute;
“Not recommended” : Using the unbounded queue LinkedBlockingQueue (queue size intger.max_value) as the work queue of a FixedThreadPool has the following effects on the thread pool:
- When the number of threads in the thread pool reaches
corePoolSize
After, the new task will wait in an unbounded queue, so the number of threads in the thread pool will not exceed corePoolSize; - Because when you use an unbounded queue
maximumPoolSize
Will be an invalid parameter because there cannot be a full task queue. So, by creatingFixedThreadPool
The source code can be seen createdFixedThreadPool
的corePoolSize
和maximumPoolSize
Is set to the same value. - Because of 1 and 2, when you use an unbounded queue
keepAliveTime
Will be an invalid argument; - The running
FixedThreadPool
(failed.shutdown()
orshutdownNow()
– will not reject tasks, causing OOM (memory overflow) when there are too many tasks.
SingleThreadExecutor
/ * *
* Returns a thread pool with only one thread
* /
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1.1.
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
Copy the code
Same as above, except core and Max are set to 1
CachedThreadPool
/ * *
* Create a thread pool, create new threads as needed, but reuse previously built threads as they become available.
* /
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
Copy the code
CorePoolSize of CachedThreadPool is set to null (0) and maximumPoolSize is set to integer.max. VALUE, which is unbounded, This means that CachedThreadPool is constantly creating new threads if the main thread is submitting tasks faster than the threads in maximumPool can process them. In extreme cases, this can exhaust CPU and memory resources.
- Executed first
SynchronousQueue.offer(Runnable task)
Submit the task to the task queue. If the currentmaximumPool
An idle thread is executing inSynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
, so the main thread performs the offer operation versus the idle threadpoll
The operation is paired and the main thread passes the task to the idle thread.execute()
Method completes, otherwise perform Step 2 below; - When the initial
maximumPool
Is empty, ormaximumPool
When there are no idle threads in, there are no threads to executeSynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
. In this case, step 1 will fail at this pointCachedThreadPool
A new thread is created to execute the task, and the execute method completes.
ScheduledThreadPoolExecutor omitted, basic wouldn’t use
ThreadPoolExecutor (emphasis)
/ * *
* Create a new ThreadPoolExecutor with the given initial parameters.
* /
public ThreadPoolExecutor(intCorePoolSize,// The number of core threads in the thread pool
intMaximumPoolSize,// The maximum number of threads in the thread pool
longKeepAliveTime,// The maximum length of time an extra idle thread can live when the number of threads is greater than the number of core threads
TimeUnit unit,// TimeUnit
BlockingQueue<Runnable> workQueue,// A queue to store tasks waiting to be executed
ThreadFactory ThreadFactory,// ThreadFactory, is used to create threads, usually by default
RejectedExecutionHandler Handler // Reject policy. If too many submitted tasks cannot be processed in a timely manner, you can customize the policy to process the tasks
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
Copy the code
The 3 most important parameters of ThreadPoolExecutor:
corePoolSize
: Number of core threads The number of threads defines the minimum number of threads that can run simultaneously.maximumPoolSize
: When the number of tasks in a queue reaches the queue capacity, the number of threads that can run simultaneously becomes the maximum number of threads.workQueue
When a new task arrives, it checks whether the number of threads currently running reaches the core number. If so, trust is placed in the queue.
ThreadPoolExecutor other common parameters:
keepAliveTime
: When the number of threads in the thread pool is greater thancorePoolSize
If no new tasks are submitted at this point, threads outside the core thread will not be destroyed immediately, but will wait until the waiting time is exceededkeepAliveTime
Before they are recycled and destroyed;unit
:keepAliveTime
The time unit of a parameter.threadFactory
** : used when executor creates a new thread.handler
: Saturation strategy. The saturation strategy is described separately below.
“Thread pool parameters relationship” :
If the maximum number of threads currently running at the same time is reached and the queue is already full, ThreadPoolTaskExecutor defines some policies:
ThreadPoolExecutor.AbortPolicy
: throwRejectedExecutionException
To reject new tasks.ThreadPoolExecutor.CallerRunsPolicy
: calls the thread that executes its own running task. You do not task requests. However, this strategy will reduce the speed of submitting new tasks and affect the overall performance of the program. In addition, this policy likes to increase the queue capacity. You can choose this strategy if your application can withstand this delay and you cannot task drop a single task request.ThreadPoolExecutor.DiscardPolicy
: Discard new tasks without processing them.ThreadPoolExecutor.DiscardOldestPolicy
: This policy discards the earliest unprocessed task request.
❝
When Spring creates thread pools through ThreadPoolTaskExecutor or we create thread pools directly through the constructor of ThreadPoolExecutor, When we don’t specify RejectedExecutionHandler saturated policy to configure the thread pool is used by default when ThreadPoolExecutor. AbortPolicy. By default, the ThreadPoolExecutor throws RejectedExecutionException to reject the new task, it will lose to the task of processing on your behalf. For scalable applications, it is recommended to use ThreadPoolExecutor. CallerRunsPolicy. This strategy provides us with scalable queues when the maximum pool is filled. (Look directly at the source code for ThreadPoolExecutor’s constructor, but don’t post the code here for simpler reasons.)
❞
Executors return the thread pool object to the following disadvantages:
FixedThreadPool
和SingleThreadExecutor
The queue length of allowed requests is integer. MAX_VALUE, which may accumulate a large number of requests and result in OOM.CachedThreadPool
和ScheduledThreadPool
: The number of threads allowed to be created is integer. MAX_VALUE. A large number of threads may be created, resulting in OOM.
public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
ExecutorService threadpools = new ThreadPoolExecutor(
3.
5.
1l.
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
//new ThreadPoolExecutor.AbortPolicy();
//new ThreadPoolExecutor.CallerRunsPolicy();
//new ThreadPoolExecutor.DiscardOldestPolicy();
//new ThreadPoolExecutor.DiscardPolicy();
try {
for (int i = 0; i < 8; i++) {
threadpools.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t Transact business");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadpools.shutdown();
}
}
}
Copy the code
Java locking mechanism
Fair lock/unfair lock
“A fair lock means that multiple threads acquire the lock in the order in which they apply for it. An unfair lock means that multiple threads acquire the lock in a different order from the order in which they apply for the lock. Possibly, it could cause priority reversal or starvation (not getting a lock for a long time – Africans…), ReentrantLock, check it out.
Reentrant lock
“Reentrant lock, also known as recursive lock, is a method that automatically acquires the lock when the same thread acquires the lock in the outer method“
synchronized void setA(a) throws Exception {
Thread.sleep(1000);
setB(); SetB () will automatically acquire the lock of setB(). If it does not, method B will not execute
}
synchronized void setB(a) throws Exception {
Thread.sleep(1000);
}
Copy the code
Exclusive lock/shared lock
- Exclusive lock: The lock can only be held by one thread at a time.
- Shared lock: The lock can be held by multiple threads.
Mutex/read-write lock
“The exclusive lock/shared lock mentioned above is a broad term, and mutex/read-write lock is its specific implementation“
Optimistic lock/pessimistic lock
- “Optimistic lock and pessimistic lock do not refer to any specific type of lock, but to view the art of war synchronization Angle.“
- “Pessimistic lock thinks that for the concurrent operation of the same data, it must be modified, even if there is no modification, it will be considered modified. Therefore, for the concurrent operation of the same data, pessimistic lock adopts the form of locking. Pessimistic lock thinks that the concurrent operation without locking will definitely have problems.”
- “Optimistic locking means that for concurrent operations on the same data, there will be no modification. When updating data, the data will be updated by trying to update, and constantly re. Optimistic locking means that there will be nothing in concurrent operations without locking.”
- Pessimistic locks are suitable for scenarios with a lot of write operations and optimistic locks are suitable for scenarios with a lot of read operations.
- “Pessimistic locks in the use of Java, is the use of various locks. Optimistic locking in the use of Java, is unlocked programming, often using a CAS algorithm, a typical example is the atomic classes, atomic classes are implemented by CAS spin operation updates. Heavyweight lock is a kind of pessimistic locking, spin locks, lightweight and biased locking is optimistic locking”.
Segmented lock
- Segmented locking is a lock design, not a specific lock. For ConcurrentHashMap, its concurrent implementation is a segmented lock for efficient concurrent operations.
- “ConcurrentHashMap Segment lock is called Segment, which is similar to the structure of HashMap (JDK7 and JDK8 implementation of HashMap), that is, it has an array of entries. Each element in the array is a linked list; ReentrantLock (Segment inherits ReentrantLock)“
- “When you need to put an element, instead of locking the entire HashMap, you use HashCode to know which segment it’s going to put in, and then lock that segment. So when you multithread put, as long as it’s not in one segment, you’re really doing parallel inserts. However, in order to obtain the global information of the HashMap, we need to obtain all the segment locks.
- Segmented locking is designed to refine the granularity of the lock so that only one item in the array is locked when the operation does not need to update the entire array.
Bias lock/lightweight lock/heavyweight lock
- “These three locks are states of locks and are specific to Synchronized. Java5 implements efficient Synchronized by introducing a lock upgrade mechanism. The states of these three locks are indicated by fields in the object header of the object monitor. Biased locking is when a piece of synchronized code is always accessed by a thread, the thread will automatically acquire the lock, reducing the cost of acquiring the lock.
- “Biased locking applies to: Always only one thread in execution code block, after it did not perform before releasing the lock, no other threads to execute the synchronous fast, use in a lock without competition, once the competition will upgrade for lightweight locks, upgrade for lightweight locked when you need to reverse bias, revocation of biased locking time will lead to stop the word operation; In lock contention, biased locks will do a lot of extra operations, especially when canceling biased locks, which will lead to the safety point, which will lead to STW, resulting in performance degradation, and should be disabled.”
- “Lightweight lock refers to that when the lock is biased and accessed by another thread lock, the biased lock will be upgraded to lightweight lock. Other threads will try to acquire the lock in the form of self-selection, which will not block and improve performance”.
- “Heavyweight lock means when locked for lightweight, although another thread is spin, but spin will not last forever, when the spin of a certain number of times, haven’t get the lock, will enter the blocked, the locked into a heavyweight. Heavyweight can let other application thread into block lock, performance”.
spinlocks
- In Java, a spin lock means that the thread attempting to acquire the lock does not block immediately, but instead attempts to acquire the lock in a loop. This has the advantage of reducing the cost of thread context switching, but the disadvantage is that the loop consumes CPU.
- “Spin lock principle is very simple, if the thread holding the lock can lock is released in a very short time resources, and the thread lock wait for competition there is no need to do between kernel mode and user mode switch into the block pending state, they just need to wait for a while (spin), such as thread holding the lock immediately after releasing the lock locks, This avoids the elimination of user thread and kernel switching.
- “Spinlocks reduce thread blocking as much as possible, and for code that does not compete for locks and has a very short lock time, performance can be significantly improved, because the spin cost is less than the cost of a thread blocking, suspending and waking up operations.“
- “But if the competition is intense, lock or thread holding the lock need long time to occupy the lock synchronization, at that time did not apply using a spin lock, because of the spin lock before acquiring a lock is CPU doing this all the time, at the same time a large number of threads a lock, the competition will result in acquiring a lock for a long time, spin thread consumption is greater than the thread block suspend operation consumption, Other threads that need CPU can’t get it, wasting it.”
Java lock summary
Java locking mechanism can be classified into Sychornized locking and Lock locking. Synchronized ensures data synchronization based on JVM, while Lock relies on special CPU instructions for data synchronization at the hardware level.
- Synchronized is an unfair, pessimistic, exclusive, mutually exclusive, reentrant heavyweight lock.
- ReentrantLock is a pessimistic, exclusive, mutually exclusive, reentrant, heavyweight lock that is not fair by default but is fair by implementation.
- ReentrantReadWriteLock is a pessimistic, write exclusive, read share, read/write, reentrable, heavyweight lock that defaults to unfair but implements fair.
❝
To clarify, I have split this article into two parts because digging limits the length or word count. This is the second part; Follow me for part one
❞
❝
Creation is not easy, if you think it will help, give a small star. Making address 😁 😁 😁
❞