Java1.5 added JUC and distributed packages, just like a good Swiss Army knife, greatly enriched Java processing concurrency means, but JUC is not simple, there is a certain learning cost, I have read some JUC implementation source code, but it is not systematic and not deep enough, this decision to start again, I read the book by Doug Lea again, so I am also in the mind of learning to improve my own learning experience.

Why read the source code

I don’t know if you have the feeling that when dealing with concurrency using the tool classes provided in JUC, you have the feeling of rote memorization, such as how LockSupport should be used and what CountDownLatch can do, but you don’t know its implementation principle, only know how and not why. There are two big problems with this state.

  • Rote memorization, relatively easy to forget, easy to use in the work of digging holes, big risk
  • Lack of in-depth understanding of JUC, knowledge can not form a system, difficult to be integrated, flexible application

That to go deep, the most direct and effective way is to read the source code!

Why parse LockSupport first

We know that JUC seems to have a lot of classes and a very complex structure, but if we were to pick the most important class, That must be a queue synchronizer AbstractQueuedSynchronizer and LockSupport AbstractQueuedSynchronizer is used to control the state of the thread, so as to achieve the purpose of switching between threads waiting to wake up. Our focus in dealing with concurrency is managing thread state, so understanding LockSupport is an important foundation.

Simple use of LockSupoort

Let’s start with a simple example

    public static void main(String[] args) {

        Thread worker = new Thread(() -> {
            LockSupport.park();
            System.out.println("start work");
        });

        worker.start();

        System.out.println("main thread sleep"); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } LockSupport.unpark(worker); try { worker.join(); } catch (InterruptedException e) { e.printStackTrace(); }} main thread sleep start workCopy the code

Start a worker thread, and the main thread will sleep 500ms first. Since the worker thread calls LockSupport’s park, it will wait until the main thread sleep ends and call unpark to wake up the worker thread. So before JUC, we used the following methods to make threads wait

Object monitor = new Object(); synchronized (monitor) { try { monitor.wait(); } catch (InterruptedException e) { e.printStackTrace(); }}Copy the code

There are three main differences

  1. Locksupport. park and unpark are not required in the synchronized code block, wait and notify are required.
  2. While pork and unpark for LockSupport are for threads, wait and notify can be arbitrary objects.
  3. LockSupport’s unpark allows a specified thread to be woken up, but notify is used to wake up one thread randomly, and notifyAll is used to wake up all threads.

LockSupport source code interpretation

Before just paving the way, now come to our main course, read LockSupport’s park and unpark methods, of course, there are some other similar overload methods, such as parkUntil, parkNanos, their general principle is similar, interested in you can consult the source code.

The source code analyzed in this and subsequent articles is based on Open Jdk 8.

LockSupport.unpark

Why speak unpark method first, because unpark code is less, relatively simple, persimmon first pick up the soft pinch -. –

    //java.util.concurrent.locks.LockSupport.java
    public static void unpark(Thread thread) {
        if(thread ! = null) UNSAFE.unpark(thread); } // UNSAFE = sun.misc.Unsafe.getUnsafe();Copy the code

Unpark, the UNSAFE object, doesn’t put you off. This class has a lot of useful methods, as discussed in the previous article. For example, to get the memory offset address of an attribute in the UNSAFE object, There are ALSO CAS operations. However, the Unsafe object must be reflected before it can be used properly, because the getUnsafe method determines whether the current class loader is BootStrapClassLoader. We continue looking at the implementation of the Unsafe class unpark.

// Unsafe.java
public native void unpark(Object thread);
Copy the code

Unpark is a native method. Its native implementation is hotspot\ SRC \share\vm\prims\ safe.cpp.

UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
  UnsafeWrapper("Unsafe_Unpark"); // Declare a Parker object p, which is the real work object Parker* p = NULL;if(jthread ! = NULL) {// Obtain the oopDesc* object of native layer according to the incoming jThread object, Oop java_thread = JNIHandles:: resolve_non_NULL (jthread);if(java_thread ! Jlong lp = java_lang_Thread::park_event(java_thread); jlong lp = java_lang_Thread:: park_event_offset;if(lp ! P = (Parker*)addr_from_java(lp); }else{// If the address is invalid MutexLocker mu(Threads_lock); java_thread = JNIHandles::resolve_non_null(jthread);if(java_thread ! JavaThread* THR = java_lang_Thread::thread(java_thread);if(thr ! P = THR ->parker(); p = THR ->parker();if(p ! = NULL) { // Bind to Java threadfor// Assign the address of p to _park_event_offset. Java_lang_Thread ::set_park_event(java_thread, addr_to_java(p)); } } } } } }if(p ! = NULL) {// This USDT2 macro, I don't know what it is, but it does not influence our analysis, we will ignore it for now#ifndef USDT2
    HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
#else /* USDT2 */
    HOTSPOT_THREAD_UNPARK(
                          (uintptr_t) p);
#endif /* USDT2 */// Call Parker's unpark method p->unpark(); }Copy the code

Based on the above code, we need to know two classes for the native layer, JavaThread and Parker

class JavaThread: public Thread {
private:
  JavaThread*    _next;                          // The next thread inthe Threads list oop _threadObj; // The Java level thread object private: Parker* _parker; public: Parker*parker() { return_parker; } // omit code...Copy the code

The JavaThread class is very long, and only a few member variables are listed here. All we need to know is that it is aThread of the Native layer. The member variable _threadObj is aThread object of the Java layer. Let’s continue to focus on the implementation of the Parker class.

class Parker : public os::PlatformParker { private: volatile int _counter ; // omit code... }Copy the code

We focus on the _counter field. If _counter field > 0, it can be passed, that is, the park method will return directly, in addition, when the park method returns, _counter will be assigned a value of 0, and the unpark method can set _counter to 1, and wake up the currently waiting thread.

Parker’s parent class is OS ::PlatformParker. What does this class do? As we all know, Java is cross-platform, so the Thread defined in the application layer must depend on the specific platform. Different platforms have different implementation. For example, Linux is one set of code and Windows is another set. PlatformParker has different implementations depending on the platform. Five platforms are supported in the OpenJdk8 implementation

  • aix
  • bsd
  • linux
  • solaris
  • windows

We know that Linux is now a widely used operating system, such as the well-known Android is based on the Linux kernel, so here we choose Linux to analyze it. The corresponding file path hotspot\ SRC \ OS \ Linux \vm\os_linux.cpp

void Parker::unpark() { int s, status ; // pthread_mutex_t _mutex [1]; // pthread_cond_t _cond [2] ; status = pthread_mutex_lock(_mutex); assert (status == 0,"invariant"); s = _counter; // set _counter to 1 _counter = 1; // s records the number of _counter before unpark. If s < 1, it is possible that the thread is waiting and needs to be woken up.if(s < 1) {// Thread might be parked // _CUR_INDEX represents the index where CONd is usedif(_cur_index ! = 1) {/ / thread is definitely the parked / / WorkAroundNPTLTimedWaitHang arguments based on the virtual machine for different processing, this parameter is 1 by defaultif(WorkAroundNPTLTimedWaitHang) {/ / threads that are waiting awakens the target status = pthread_cond_signal (& _cond [_cur_index]); assert (status == 0,"invariant"); Pthread_mutex_unlock (_mutex) status = pthread_mutex_unlock(_mutex); assert (status == 0,"invariant");
      } elseStatus = pthread_mutex_unlock(_mutex); assert (status == 0,"invariant"); // Wake up the thread with a signal outside the mutex block. status = pthread_cond_signal (&_cond[_cur_index]); assert (status == 0,"invariant"); }}elseIf the thread is not waiting, pthread_mutex_unlock(_mutex) is returned; assert (status == 0,"invariant"); }}elseIf the thread is not waiting, pthread_mutex_unlock(_mutex) is returned; assert (status == 0,"invariant"); }}Copy the code

In Linux, pthread_mutex_lock is used to unlock pthread_mutex_unlock, and pthread_mutex_unlock is used to unlock pthread_mutex_unlock. Wake up is the pthread_cond_signal. Next, parse the park method of LockSupport.

LockSupport.park

Let’s take a look at the implementation of the Java layer park method

    public static void park() {
        UNSAFE.park(false, 0L);
    }
Copy the code

Implementation in Unsafe

public native void park(boolean var1, long var2);
Copy the code

It’s still a native method, so let’s follow along

UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jBoolean isAbsolute, jlong time)) // thread->parker()->park(isAbsolute ! = 0, time); // omit code...Copy the code

Non-critical code is omitted, the focus is on park, this thread we have met before, is a native of JavaThread object, then calls the park, method of Parker, continue to go in Linux platform os_linux… CPP implementation

Void Parker::park(bool isAbsolute, jlong time) {// Set _counter to 0 and return _counterif (Atomic::xchg(0, &_counter) > 0) return;

  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread"); JavaThread *jt = (JavaThread *)thread; // Determine whether the thread has been interruptedif (Thread::is_interrupted(thread, false)) {
    return; } // Next, demultiplex/decode time arguments timespec absTime; // The park method takes isAbsolute =falseTime = 0, so it's going to keep going downif (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all return; If (time >0) {// If (time >0) {// If (time >0) {// If (time >0); UnpackTime Calculates the time of absTime unpackTime(&absTime, isAbsolute, time); } ThreadBlockInVM tbivm(jt); / / to judge whether the Thread is interrupted again, without interruption, and try to acquire a mutex, if failed, direct return if (Thread: : is_interrupted (Thread, false) | | pthread_mutex_trylock (_mutex)! = 0) { return; } int status ; If (_counter > 0) {// No wait needed _counter = 0; status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; OrderAccess::fence(); // Insert a write barrier to ensure visibility, as described below OrderAccess::fence(); return; } // omit the assert code OSThreadWaitState osts(thread-> osThread (), false /* not object.wait () */); Jt ->set_suspend_equivalent(); // Set JavaThread's _suspend_equivalent to true, representing that the thread is paused. // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self() assert(_cur_index == -1, "invariant"); if (time == 0) { _cur_index = REL_INDEX; // arbitrary choice when not timed timed Status = pthread_cond_wait (&_cond[_cur_index], _mutex); } else { _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; // The thread enters the wait with a timeout, The internal implementation calls the pthread_cond_timedwait system call status = OS ::Linux:: safe_cond_timedWait (&_cond[_cur_index], _mutex, &absTime); if (status ! = 0 && WorkAroundNPTLTimedWaitHang) { pthread_cond_destroy (&_cond[_cur_index]) ; pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr()); } } _cur_index = -1; // omit assert code // reset _counter to 0 _counter = 0; Status = pthread_mutex_unlock(_mutex); assert_status(status == 0, status, "invariant") ; // Insert write barrier OrderAccess::fence(); Inline void OrderAccess::fence() {// If it is a multicore CPU if (OS ::is_MP()) {// Always use locked addl since mfence is sometimes expensive // #ifdef AMD64 __asm__ Volatile ("lock; Addl $0,0(%% RSP)" : : : "cc", "memory"); #else __asm__ volatile ("lock; Addl $0,0(%%esp)" : : : "cc", "memory"); #endif } }Copy the code

The above analysis of the implementation principle of Park, with the knowledge of the previous analysis of unpark method, park method should be easy to understand.

Harvest and summary

Read LockSupport source code, you can sum up a few points

  1. If _counter=0, the calling park thread will wait until it is awakened by unpark. If unpack is called first and park is called again, it will return directly. And consume _counter (set to 0).
  2. Pthread_mutex_lock: pthread_mutex_unlock: pthread_mutex_unlock: Pthread_cond_signal

Finally, I’d like to mention a little bit about thread state in the Java layer, which may not be very clear to some of you, so I’ll summarize. There are six Java thread states.

  1. NEW (NEW thread, not yet called start)
  2. RUNNABLE (called start, running, or waiting for the operating system to dispatch CPU time slices)
  3. BLOCKED (Synchronied, waiting for monitorEnter)
  4. WAITING (wait, locksupport. park will enter the state)
  5. TIMED_WAITING (wait with wait time, locksupport. parkNanos, parkUtil)
  6. TERMINATED (thread execution TERMINATED)

About concurrent, our software engineer to do, is to control the thread in the right transformation between the state and the so-called “to do a good job, must first sharpen his”, provide various concurrent JDK tools, we only understand them, to the use of flexible and efficient, this is my record “chew through Java concurrency” series of beginner’s mind, share with you!