The official account is open for submission
Good writing is not afraid of going unread Poking me contribute
Category: Java concurrency
After reading about
5
minutes
Author: Huang Junbin
Link: jianshu.com/p/dfeac8f3f9db
Source: Jane Book
Copyright belongs to the author, has authorized the original, prohibit unauthorized reprint.
Thread – Thread
01
Introduction to the
The smallest unit of scheduling in modern operating systems is threads, also known as lightweight processes. Multiple threads can be created within a process, each with its own counters, stacks, local variables and other attributes, and have access to shared memory variables. The processor switches between these threads at high speed, giving the user the impression that they are executing simultaneously.
02
Why multithreading
-
More processor cores
-
Faster response times
-
Better to the programming model
03
Thread state
A Java thread can be in six different states during its lifetime, and a thread can be in only one of these states at any given moment:
04
Common Methods
05
Wait and notice
One thread modifies the value of an object, and another thread senses the change and acts accordingly, starting in one thread and ending in another. The former is the producer, while the latter is the consumer. This mode isolates the “how” and “what”, achieves decoupling at the functional level, and has good scalability in architecture.
Principle of waiting party:
-
Gets the lock of the object
-
If the condition is not met, the lock’s wait() method is called so that the thread is in waiting and the condition is still checked after being notified
-
If the conditions are met, the corresponding logic is executed
Pseudo code:
Synchronized (objects) {while{object. Wait (); } The corresponding logical processing}
Copy the code
Principle of notifying Party:
-
Gets the lock of the object
-
Change the conditions
-
Notifies all threads waiting on the object
Pseudo code:
Synchronized (object){change the condition object. NotifyAll (); }
Copy the code
Implementation code:
public class WaitNotify{ static boolean flag = true; static Object lock = new Object(a); public staticvoid main(String[] args) throws Exception{ Thread waitThread = new Thread(new Wait(),"WaitThread"); waitThread.start(); TimeUtil.SECONDS.sleep(1); Thread notifyThread = new Thread(new Notify(),"NotifyThread"); notifyThread.start(); }static class Wait implements Runnable{ public void run(){ / / lock synchronized(lock){ // If the condition is not met, enter WAITTING state and release lock while(flag){ System.out.println("flag is true "); lock.wait(); } // The condition is satisfied System.out.println("doSomething"); } }} static class Notify implements Runnable{ public void run(){ / / lock synchronized(lock){ // Get the lock of the lock, then notify, notification will not release the lock // The WaitThread cannot return from the wait method until the notifying thread completes execution and releases the lock lock.notifyAll(); System.out.println("flag is false now"); flag = false; }}}
Copy the code
The output is as follows:
flag is trueflag is false nowdoSomething
ThreadPool – ThreadPool
01
Application scenarios of thread pools
Thread pools are the most widely used concurrency framework in Java, and can be used by almost any program that needs to perform tasks asynchronously or concurrently. There are three benefits to using thread pools properly during development:
-
Reduce resource consumption. Reduce the cost of thread creation and destruction by reusing created threads.
-
Improve response speed. When a task arrives, it can be executed immediately without waiting for the thread to be created.
-
Improve thread manageability. Threads are scarce resources. If created without limit, they consume system resources and degrade system stability. Thread pools can be used for uniform allocation, tuning, and monitoring. However, to make good use of thread pools, you must have a thorough understanding of how they are implemented.
02
Implementation principle of thread pool
ThreadPoolExecutor
ThreadPoolExecutor performs the execute method in the following four cases:
-
If fewer threads are currently running than corePoolSize, a new thread is created to perform the task (note that this step requires a global lock).
-
If the running thread is equal to or more than corePoolSize, the task is added to BlockingQueue.
-
If the task cannot be added to the BlockingQueue (the queue is full), a new thread is created to process the task (note: this step requires a global lock).
-
If you create a new thread will make the currently running thread beyond maximumPoolSize, task will be rejected, and call the RejectedExecutionHandler. RejectedExecution () method.
The overall design idea for ThreadPoolExecutor to take these steps is to avoid acquiring global locks as much as possible when executing the execute() method (which would be a serious scalability bottleneck). After ThreadPoolExecutor has warmed up (the number of threads currently running is greater than or equal to corePoolSize), almost all execute() method calls execute Step 2, which does not require a global lock.
03
Executor Execution Flow (based on JDK7.0)
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get();//AtomicInteger / / 1. if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } / / 2. if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))/ / (21) reject(command); else if (workerCountOf(recheck) == 0)/ / (22) addWorker(null.false);// Why false } / / 3. else if(! addWorker(command,false)) reject(command); }
Copy the code
-
We first check to see if the number of threads in the current thread pool is smaller than the number of core threads we specified. If so, we try to create a new thread, make command its first task, and add them to the thread pool. However, when we call addWorker(command, true) to add a thread to the thread pool after determining the number of valid threads, it will be concurrent with multiple threads and may result in joining failure. If it succeeds, it will return directly. If it fails, it will retrieve CLT again, because at this time CLT must have changed, otherwise it will not fail. Continue to perform (2).
-
Using isRunning(c) to determine if the thread pool is still running, we try to add the current command to the blocking queue. The process of joining is also concurrent and may fail. If this fails, proceed with (3). After joining the blocking queue successfully, we need to check again to prevent threads from closing during joining or threads from running out of threads in the thread pool, all because the idle time exceeds our specified aliveTime. If the thread pool is no longer in the RUNNING state, discard it using our rejection policy (21). If there are no threads in the pool, create an empty thread and let it block the queue for task execution (22).
-
If neither of the above steps succeeds, then we need to use the maximum thread pool we specified to process it, but this can also fail, can multiple threads execute, if failed, the thread is discarded with the rejection policy.
private boolean addWorker(Runnable firstTask, boolean core) { // (1) loop CAS to increase the number of threads in the pool by 1. retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null&&! workQueue.isEmpty()))return false; for (;;) { int wc = workerCountOf(c); //core true adds threads to the core thread pool false adds threads to the maximum thread pool // The number of threads exceeds the limit, cannot be added, directly return if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS changes the CLT value to +1 and drains space from the pool for the thread to be added if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl Return to the Retry outer loop if the state of the thread pool changes if(runStateOf(c) ! = rs)continue retry; // else CAS failed due to workerCount change; retry inner loop}}//(2) create a new thread and add it to the thread pool workers. boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // Workers operations need to be locked final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if(t ! =null) { // Refine the strength of the lock to prevent critical sections from being too large and wasting time mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); // Determine the state of the thread pool if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // Determine the status of the added task, if the exception is already thrown if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // Add the new thread to the thread pool workers.add(w); int s = workers.size(); // Fix largestPoolSize if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; }}finally { mainLock.unlock(); } // Thread adds the thread pool successfully, then starts the newly created thread if (workerAdded) { t.start();/ / (3) workerStarted = true; }}}finally { // If the thread fails to add the thread pool or start the thread fails, the addWorkerFailed function needs to be called. If the thread is added successfully, the addWorkerFailed function needs to be removed and the CLT value is restored if (! workerStarted) addWorkerFailed(w); } returnworkerStarted; }
Copy the code
04
How to implement thread reuse
Thread pool, each thread is a Worker, the Worker is an inner class, inherited AbstractQueuedSynchronizer.
Worker’s run method calls runWorker’s method, and runWorker’s method circulates getTask to get the blocked queue task, so as to achieve the purpose of thread reoccurrence.
The main fields of Worker are the following three, and the code is relatively simple:
// The thread that is actually running in the pool. Created from the thread factory we specifiedfinal Thread thread;// Thread wrapped tasks. Thread mainly calls the run method of the task when runningRunnable firstTask;// Record the number of tasks completed by the current threadvolatile long completedTasks;
Copy the code
Worker’s constructor
Worker(Runnable firstTask) { setState(-1); // Inhibit interrupts until runWorker this.firstTask = firstTask; // Create a thread using the thread factory we specified. Note that the argument is this, which means that when thread.run is executed, the Woker class's run method is actually executed this.thread = getThreadFactory().newThread(this); }
Copy the code
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while(task ! =null|| (task = getTask()) ! =null) { w.lock(); // The thread pool is stopped when it is stopped or when the current thread is interrupted. However, if the current thread is not interrupted, an interrupt request is issued if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try { // Start the task before the Hook, similar to the callback function beforeExecute(wt, task); Throwable thrown = null; try { // Execute the task task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // Hook after task execution, similar to callback functionafterExecute(task, thrown); }}finally { // The task is reset after execution, completedTasks counter ++, and unlocked task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // The Worker exits destruction when the thread is idle and reaches the value we set. processWorkerExit(w, completedAbruptly); }}
Copy the code
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // if the thread pool is shutdown and the queue is empty, or if the thread pool is stopped or terminated, null is returned when the thread pool number is -1 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // Indicates whether the current thread should be timed out when it is idle boolean timed; for (;;) { int wc = workerCountOf(c); // If allowCoreThreadTimeOut is true or the number of current threads is greater than the number of core thread pools, timeout reclamation is required timed = allowCoreThreadTimeOut || wc > corePoolSize; / / (1) // If the number of threads is less than the maximum number of threads, and no timeout collection is allowed or not timed out, then the loop is broken and the queue continues to block. if (wc <= maximumPoolSize && ! (timedOut && timed)) break; // If not true, the current thread count -1, return null, reclaim the thread if (compareAndDecrementWorkerCount(c)) return null; // If the above if is not true, the CAS loop attempts to modify the CTL again c = ctl.get(); // Re-read ctl if(runStateOf(c) ! = rs)continue retry; // else CAS failed due to workerCount change; retry inner loop} (2)try { // Call the poll of the blocking queue if idle collection is allowed, otherwise take until there is a redeeming task in the queue Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); TimedOut = true; timedOut = true; Enter the next loop, and will not be established at (1), and then enter the program of CAS to modify CTL if(r ! =null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}
Copy the code
05
How to Handle exceptions
The currently running thread beyond maximumPoolSize, the task will be rejected, and call the RejectedExecutionHandler. RejectedExecution () method, the default AbortPolicy exception handling.
-
ThreadPoolExecutor. AbortPolicy: discard task and throw RejectedExecutionException anomalies.
/** * A handler for rejected tasks that throws a * {@code RejectedExecutionException}. */public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always. */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }}
Copy the code
-
ThreadPoolExecutor. DiscardPolicy: discard task too, but I don’t throw an exception.
/** * A handler for rejected tasks that silently discards the * rejected task. */public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }}
Copy the code
-
ThreadPoolExecutor. DiscardOldestPolicy: discard queue in front of the task, and then to try to perform a task (repeat)
/** * A handler for rejected tasks that discards the oldest unhandled * request and then retries {@code execute}, unless the executor * is shut down, in which case the task is discarded. */public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }}
Copy the code
-
ThreadPoolExecutor. CallerRunsPolicy: handle the tasks by the calling thread
/** * A handler for rejected tasks that throws a * {@code RejectedExecutionException}. */public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always. */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }}
Copy the code
Executor Framework
01
FixedThreadPool
A FixedThreadPool is called a reusable thread pool with a fixed number of threads:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor( nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, newLinkedBlockingQueue<Runnable>()); }
Copy the code
FixedThreadPool is suitable for scenarios where the current number of threads needs to be limited to meet resource management requirements. It is suitable for heavily loaded servers.
02
SingleThreadExecutor
Create singlethread-executor using a SingleThread by following the Executors API:
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService( new ThreadPoolExecutor( 1.1.0L, TimeUnit.MILLISECONDS, newLinkedBlockingQueue<Runnable>(), threadFactory)); }
Copy the code
SingleThreadExecutor is for tasks that need to be executed sequentially; And no more than one thread is active at any one point in time.
03
CachedThreadPool
Create a CachedThreadPool that creates new threads as required by the Executors API:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, newSynchronousQueue<Runnable>()); }
Copy the code
CachedThreadPool is an unbounded thread pool suitable for small programs that perform many short asynchronous tasks, or for lightly loaded servers.
04
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor usually use the factory class Executors to create. Executors can create two types of ScheduledThreadPoolExecutor, as follows:
-
ScheduledThreadPoolExecutor ScheduledThreadPoolExecutor: contains a number of threads.
-
ScheduledThreadPoolExecutor SingleThreadScheduledExecutor: contains only a single thread.
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); }public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return newScheduledThreadPoolExecutor(corePoolSize); }
Copy the code
ScheduledThreadPoolExecutor applies to require multiple background threads execute cycle task, at the same time in order to meet the demand for resources management need to limit the number of background thread application scenario.
END
Have help? Click on the top right
Share it with friends, or forward it to moments
1. The public account is open for submission, which has many benefits. Poking me contribute
2. Age: 43
How to use Git to manage porn?
Graphic Android Studio skills | play TODO and custom TODO
Summary and reflection of a 6-year software engineer
Thank you for liking or sharing
Don’t forget to scan for attention
Click “Read the original article” to see more