From the JDK’s delay queue in the previous article, thread waiting was ultimately implemented through locksupport. park. How does the underlying implementation of wait and timeout wait work

LockSupport’s park and unpark methods

public static void park() { UNSAFE.park(false, 0L); } public static void parkNanos(long nanos) { if (nanos > 0) UNSAFE.park(false, nanos); } public static void unpark(Thread thread) { if (thread ! = null) UNSAFE.unpark(thread); }Copy the code

The actual locksupport. park is implemented via the Unsafe park method, which is a native method.

/**
 * Blocks current thread, returning when a balancing
 * {@code unpark} occurs, or a balancing {@code unpark} has
 * already occurred, or the thread is interrupted, or, if not
 * absolute and time is not zero, the given time nanoseconds have
 * elapsed, or if absolute, the given deadline in milliseconds
 * since Epoch has passed, or spuriously (i.e., returning for no
 * "reason"). Note: This operation is in the Unsafe class only
 * because {@code unpark} is, so it would be strange to place it
 * elsewhere.
 */
public native void park(boolean isAbsolute, long time);
Copy the code

The Unsafe park method of the JVM

The park method of Thread’s Parker object can wait a while from the following JDK code.

UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) { HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time); EventThreadPark event; JavaThreadParkedState jtps(thread, time ! = 0); thread->parker()->park(isAbsolute ! = 0, time); if (event.should_commit()) { const oop obj = thread->current_park_blocker(); if (time == 0) { post_thread_park_event(&event, obj, min_jlong, min_jlong); } else { if (isAbsolute ! = 0) { post_thread_park_event(&event, obj, min_jlong, time); } else { post_thread_park_event(&event, obj, time, min_jlong); } } } HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker()); } UNSAFE_ENDCopy the code

The Park object defined internally in the thread. HPP file

  private:
  Parker _parker;
 public:
  Parker* parker() { return &_parker; }
Copy the code

Os_posix. CPP is a Linux implementation of Park

  1. Set _counter to 0 via CAS and return the old value. If the value is greater than 0, access is allowed.
  2. Gets the current thread.
  3. Check whether the thread is interrupted, if so, return directly, (that is, the thread in the interrupted state will ignore park, will not block wait)
  4. If the Unsafe call to park is less than 0, or if the time is absolute and time is 0, the Unsafe call to park is false and 0.
  5. If time is greater than 0, it is converted to absolute time.
  6. Create a ThreadBlockInVM object and call pthread_mutex_trylock to obtain the thread mutex. If no lock is obtained, return the ThreadBlockInVM object.
  7. Determine if the _counter variable is greater than 0, if so, reset _counter to 0, release the thread lock, and return directly.
  8. Call OrderAccess: : fence (); A memory barrier was added to prevent instruction reordering and ensure the order of lock and lock release instructions.
  9. Create the OSThreadWaitState object,
  10. If time is greater than 0, then call pthread_cond_WAIT to wait, if not, then call pthread_cond_timedWait to wait with time parameter absTime.
  11. Call pthread_mutex_UNLOCK to release the _mutex lock,
  12. Calling OrderAccess::fence() again disallows instruction reordering.
// Parker::park decrements count if > 0, else does a condvar wait. Unpark // sets count to 1 and signals condvar. Only one thread ever waits // on the condvar. Contention seen when trying to park implies that someone // is unparking you, so don't wait. And spurious returns are fine, so there // is no need to track notifications. void Parker::park(bool isAbsolute, jlong time) { // Optional fast-path check: // Return immediately if a permit is available. // We depend on Atomic::xchg() having full barrier semantics // since we  are doing a lock-free update to _counter. if (Atomic::xchg(&_counter, 0) > 0) return; JavaThread *jt = JavaThread::current(); // Optional optimization -- avoid state transitions if there's // an interrupt pending. if (jt->is_interrupted(false)) {  return; } // Next, demultiplex/decode time arguments struct timespec absTime; if (time < 0 || (isAbsolute && time == 0)) { // don't wait at all return; } if (time > 0) { to_abstime(&absTime, time, isAbsolute, false); } // Enter safepoint region // Beware of deadlocks such as 6317397. // The per-thread Parker:: mutex is a classic leaf-lock. // In particular a thread must never block on the Threads_lock while // holding the Parker:: mutex. If safepoints are pending both the // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock. ThreadBlockInVM tbivm(jt); // Can't access interrupt state now that we are _thread_blocked. If we've // been interrupted since we checked above then _counter will be > 0. // Don't wait if cannot get lock since interference arises from // unparking. if (pthread_mutex_trylock(_mutex) ! = 0) { return; } int status; if (_counter > 0) { // no wait needed _counter = 0; status = pthread_mutex_unlock(_mutex); assert_status(status == 0, status, "invariant"); // Paranoia to ensure our locked and lock-free paths interact // correctly with each other and Java-level accesses. OrderAccess::fence(); return; } OSThreadWaitState osts(jt->osthread(), false /* not Object.wait() */); assert(_cur_index == -1, "invariant"); if (time == 0) { _cur_index = REL_INDEX; // arbitrary choice when not timed status = pthread_cond_wait(&_cond[_cur_index], _mutex); assert_status(status == 0 MACOS_ONLY(|| status == ETIMEDOUT), status, "cond_wait"); }else { _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime); assert_status(status == 0 || status == ETIMEDOUT, status, "cond_timedwait"); } _cur_index = -1; _counter = 0; status = pthread_mutex_unlock(_mutex); assert_status(status == 0, status, "invariant"); // Paranoia to ensure our locked and lock-free paths interact // correctly with each other and Java-level accesses. OrderAccess::fence();Copy the code

How to implement pthread_cond_timedWait in Linux operating system

The pthread_cond_timedwait function is located in the glibc pthread_cond_wait.c, which calls the __pthread_cond_wait_common implementation

/* See __pthread_cond_wait_common. */ int ___pthread_cond_timedwait64 (pthread_cond_t *cond, pthread_mutex_t *mutex, const struct __timespec64 *abstime) { /* Check parameter validity. This should also tell the compiler that it can assume  that abstime is not NULL. */ if (! valid_nanoseconds (abstime->tv_nsec)) return EINVAL; /* Relaxed MO is suffice because clock ID bit is only modified in condition creation. */ unsigned int flags = atomic_load_relaxed (&cond->__data.__wrefs); clockid_t clockid = (flags & __PTHREAD_COND_CLOCK_MONOTONIC_MASK) ? CLOCK_MONOTONIC : CLOCK_REALTIME; return __pthread_cond_wait_common (cond, mutex, clockid, abstime); }Copy the code

The following __pthread_cond_wait_common is implemented via __futex_abstimed_wait_cancelable64

static __always_inline int __pthread_cond_wait_common (pthread_cond_t *cond, pthread_mutex_t *mutex, clockid_t clockid, Const struct __timespec64 *abstime) {" omitted "" err = __futex_abstimed_wait_cancelable64 (cond->__data.__g_signals + g, 0, clockid, abstime, private); '' omit ''}Copy the code

__futex_abstimed_wait_cancelable64 is a call to __futex_abstimed_wait_common

int
__futex_abstimed_wait_cancelable64 (unsigned int* futex_word,
                                    unsigned int expected, clockid_t clockid,
                                    const struct __timespec64* abstime,
                                    int private)
{
  return __futex_abstimed_wait_common (futex_word, expected, clockid,
                                       abstime, private, true);
}

Copy the code

__futex_abstimed_wait_common calls __futex_abstimed_wait_common64 or __futex_abstimed_wait_common32 to determine whether the platform is 64-bit or 32-bit

static int __futex_abstimed_wait_common (unsigned int* futex_word, unsigned int expected, clockid_t clockid, const struct __timespec64* abstime, int private, bool cancel) { int err; unsigned int clockbit; /* Work around the fact that the kernel rejects negative timeout values despite them being valid. */ if (__glibc_unlikely ((abstime ! = NULL) && (abstime->tv_sec < 0))) return ETIMEDOUT; if (! lll_futex_supported_clockid (clockid)) return EINVAL; clockbit = (clockid == CLOCK_REALTIME) ? FUTEX_CLOCK_REALTIME : 0; int op = __lll_private_flag (FUTEX_WAIT_BITSET | clockbit, private); #ifdef __ASSUME_TIME64_SYSCALLS err = __futex_abstimed_wait_common64 (futex_word, expected, op, abstime, private, cancel); #else bool need_time64 = abstime ! = NULL && ! in_time_t_range (abstime->tv_sec); if (need_time64) { err = __futex_abstimed_wait_common64 (futex_word, expected, op, abstime, private, cancel); if (err == -ENOSYS) err = -EOVERFLOW; } else err = __futex_abstimed_wait_common32 (futex_word, expected, op, abstime, private, cancel); #endif switch (err) { case 0: case -EAGAIN: case -EINTR: case -ETIMEDOUT: case -EINVAL: case -EOVERFLOW: /* Passed absolute timeout uses 64 bit time_t type, but underlying kernel does not support 64 bit time_t futex syscalls. */ return -err; case -EFAULT: /* Must have been caused by a glibc or application bug. */ case -ENOSYS: /* Must have been caused by a glibc bug. */ /* No other errors are documented at this time. */ default: futex_fatal_error (); }}Copy the code

__futex_ABSTImed_WAIT_COMMON64 is implemented by calling INTERNAL_SYSCALL_CANCEL macro definition

static int
__futex_abstimed_wait_common64 (unsigned int* futex_word,
                                unsigned int expected, int op,
                                const struct __timespec64* abstime,
                                int private, bool cancel)
{
  if (cancel)
    return INTERNAL_SYSCALL_CANCEL (futex_time64, futex_word, op, expected,
				    abstime, NULL /* Unused.  */,
				    FUTEX_BITSET_MATCH_ANY);
  else
    return INTERNAL_SYSCALL_CALL (futex_time64, futex_word, op, expected,
				  abstime, NULL /* Ununsed.  */,
				  FUTEX_BITSET_MATCH_ANY);
}
Copy the code

Macro definition of the system call

/* Issue a syscall defined by syscall number plus any other argument required. Any error will be returned unmodified (including errno). */ #define INTERNAL_SYSCALL_CANCEL(...) \ ({ \ long int sc_ret; \ if (NO_SYSCALL_CANCEL_CHECKING) \ sc_ret = INTERNAL_SYSCALL_CALL (__VA_ARGS__); \ else \ { \ int sc_cancel_oldtype = LIBC_CANCEL_ASYNC (); \ sc_ret = INTERNAL_SYSCALL_CALL (__VA_ARGS__); \ LIBC_CANCEL_RESET (sc_cancel_oldtype); \ } \ sc_ret; The \})Copy the code

Summary of the main LockSupport park waiting for the implementation of the underlying implementation of the analysis, for Linux system call has not found the source, the follow-up will continue to track, I hope readers know the full fan can tell, thank you.