Because wait() and notify() are used in conjunction with synchronized, synchronization cannot use a display Lock. So the display lock needs to provide its own wait/notification mechanism, and Condition comes into being.

The await() method in Condition is equivalent to the wait() method of Object, and the signal() method in Condition is equivalent to the notify() method of Object, SignalAll () in Condition is equivalent to notifyAll() on Object. The wait(),notify(), and notifyAll() methods of Object are bundled with the synchronized keyword. Condition is bundled with a “mutex “/” shared lock”.

1. Function list

  1. Void await() throws InterruptedException The current thread enters the wait state until it is notified of signal or interrupted and returns from await().

  2. Void awaitUninterruptibly() The current thread enters the wait state until notified and does not respond to interrupts;

  3. Long awaitNanos(Long nanosTimeout) throws InterruptedException adds a timeout response based on the return condition of interface 1. The return value represents the time currently remaining. If nanosTimeout is awakened before Return value = nanosTimeout – the actual elapsed time, return value <= 0 indicates timeout;

  4. Throws InterruptedException. Boolean await(long time, TimeUnit Unit) throws InterruptedException. The return value is true/false, awakened before time, true, and false after timeout.

  5. Boolean awaitUntil(Date deadline) throws InterruptedException The current thread enters the wait state until it is notified at a specified time in the future. Returns true if the thread is not notified at a specified time. Returns false.

  6. Void signal() wakes up a thread waiting on Condition;

  7. Void signalAll() wakes up all threads waiting on the Condition.

2. Implementation

See the computer had done a problem, take it as an example:

Write a Java application that has three processes: student1, student2, and teacher. Student1 sleeps for 1 minute and student2 sleeps for 5 minutes. The Teacher “woke up” the dormant thread student1 after output 4 sentences “class”; Student1 wakes up the dormant thread student2.

2.1 Implementation 1:

Based on Object and synchronized implementation.

package com.fantJ.bigdata;

/**
 * Created by Fant.J.
 * 2018/7/2 16:36
 */
public class Ten {
    static class Student1{
        private boolean student1Flag = false;
        public synchronized boolean isStudent1Flag() {
            System.out.println("Student 1 starts sleeping for 1 minute");
            if(! this.student1Flag){ try { System.out.println(Student 1 is asleep);
                    wait(1 * 1000 * 60); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Student 1 is awakened);

            return student1Flag;
        }

        public synchronized void setStudent1Flag(boolean student1Flag) {
            this.student1Flag = student1Flag;
            notify();
        }
    }
    static class Student2{
        private boolean student2Flag = false;
        public synchronized boolean isStudent2Flag() {
            System.out.println("Student 2 starts sleeping for 5 minutes");
            if(! this.student2Flag){ try { System.out.println(Student 2 is asleep);
                    wait(5 * 1000 * 60); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Student 2 is awakened);
            return student2Flag;
        }

        public synchronized void setStudent2Flag(boolean student2Flag) {
            notify();
            this.student2Flag = student2Flag;
        }
    }
    static class Teacher{
        private boolean teacherFlag = true;
        public synchronized boolean isTeacherFlag() {
            if(! this.teacherFlag){ try {wait(a); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("The teacher is ready to yell for class.");

            return teacherFlag;
        }

        public synchronized void setTeacherFlag(boolean teacherFlag) {
            this.teacherFlag = teacherFlag;
            notify();
        }
    }
    public static void main(String[] args) {
        Student1 student1 = new Student1();
        Student2 student2 = new Student2();
        Teacher teacher = new Teacher();

        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                for(int i = 0; i<4; i++){ System.out.println("Class");
                }
                teacher.isTeacherFlag();
                System.out.println(Student 1 is woken up for 1s.);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                student1.setStudent1Flag(true); }}); Thread s1 = new Thread(newRunnable() {
            @Override
            public void run() {
                student1.isStudent1Flag();
                System.out.println("Ready to wake student 2. It takes 1s to wake up.");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                student2.setStudent2Flag(true); }}); Thread s2 = new Thread(newRunnable() {
            @Override
            public void run() { student2.isStudent2Flag(); }}); s1.start(); s2.start(); t.start(); }}Copy the code

Of course, it is possible to use notifyAll with less code. This implementation, though complex, is much better than notifyAll() in terms of single performance because there is no waste of resources due to lock contention. But as you can see, the code is complex, and instances need to be well isolated from each other.

2.2 Implementation two:

Based on Condition, ReentrantLock implementation.

public class xxx{
        private int signal = 0;
        public Lock lock = new ReentrantLock();
        Condition teacher = lock.newCondition();
        Condition student1 = lock.newCondition();
        Condition student2 = lock.newCondition();

        public void teacher(){
            lock.lock();
            while(signal ! = 0){ try { teacher.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("The teacher calls class.");
            signal++;
            student1.signal();
            lock.unlock();
        }
        public void student1(){
            lock.lock();
            while(signal ! = 1){ try { student1.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Student 1 is awake. Ready to wake student 2.);
            signal++;
            student2.signal();
            lock.unlock();
        }
        public void student2(){
            lock.lock();
            while(signal ! = 2){ try { student2.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Student 2 wakes up); signal=0; teacher.signal(); lock.unlock(); } public static void main(String[] args) { ThreadCommunicate2 ten = new ThreadCommunicate2(); new Thread(() -> ten.teacher()).start(); new Thread(() -> ten.student1()).start(); new Thread(() -> ten.student2()).start(); }}Copy the code

Condition depends on the Lock interface. The basic code for generating a Condition is that lock.newCondition() calls await() and signal() methods of the Condition, both of which must be protected by Lock. It must be used between lock.lock() and lock.unlock.

As you can see, I have removed the synchronized method keyword and added lock.lock() before and after each synchronized method; lock.unlock(); To obtain/release the lock, and cast the desired Condition before releasing the lock. Similarly, we use signal to communicate between threads.

3. Condition implements bounded queues

The reason for using it for bounded queues is that we can use Condition to block (when the queue is empty or full). This saves us a lot of trouble.

public class MyQueue<E> {

    private Object[] objects;
    private Lock lock = new ReentrantLock();
    private Condition addCDT = lock.newCondition();
    private Condition rmCDT = lock.newCondition();

    private int addIndex;
    private int rmIndex;
    private int queueSize;

    MyQueue(int size){
        objects = new Object[size];
    }
    public void add(E e){
        lock.lock();
        while (queueSize == objects.length){
            try {
                addCDT.await();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
        objects[addIndex] = e;
        System.out.println("Added data"+"Objects["+addIndex+"] ="+e);
        if (++addIndex == objects.length){
            addIndex = 0;
        }
        queueSize++;
        rmCDT.signal();
        lock.unlock();

    }
    public Object remove(){
        lock.lock();
        while (queueSize == 0){
            try {
                System.out.println("Queue empty");
                rmCDT.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        Object temp = objects[rmIndex];
        objects[rmIndex] = null;
        System.out.println("Remove the data"+"Objects["+rmIndex+"] = null");
        if (++rmIndex == objects.length){
            rmIndex = 0;
        }
        queueSize--;
        addCDT.signal();
        lock.unlock();
        return temp;
    }
    public void foreach(E e){
        if (e instanceof String){
            Arrays.stream(objects).map(obj->{
                if (obj == null){
                    obj = "";
                }
                return obj;
            }).map(Object::toString).forEach(System.out::println);
        }
        if (e instanceof Integer){
            Arrays.stream(objects).map(obj -> {
                if (obj == null ){
                    obj = 0;
                }
                returnobj; }).map(object -> Integer.valueOf(object.toString())).forEach(System.out::println); }}}Copy the code

The add method simply adds data to the queue. Remove removes data from the queue in FIFO. The foreach method is a utility method that looks at the contents of a queue, and it’s easy to see that it’s used for traversal.

    public static void main(String[] args) {
        MyQueue<Integer> myQueue = new MyQueue<>(5);
        myQueue.add(5);
        myQueue.add(4);
        myQueue.add(3);
//        myQueue.add(2);
//        myQueue.add(1);
        myQueue.remove();
        myQueue.foreach(5);
    }
Copy the code
Add data Objects[0] = 5 Add data Objects[1] = 4 Add data Objects[2] = 3 Remove data Objects[0] = NULL 0 4 3 0 0Copy the code

4. Source code analysis

ReentrantLock.class

public Condition newCondition() {
    return sync.newCondition();
}
Copy the code

The sync source:

private final Sync sync;
Copy the code

There is a newCondition() method in the Sync class:

final ConditionObject newCondition() {
    return new ConditionObject();
}
Copy the code
public class ConditionObject implements Condition, java.io.Serializable {
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
}
Copy the code
public interface Condition {
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
Copy the code
Await the source code:
public final void await() throws InterruptedException { // 1. If the current thread is interrupted, an interrupt exception is thrownif(Thread.interrupted()) throw new InterruptedException(); // add the node to the Condition queue. If lastWaiter is in the Condition queue, it will be kicked out of the Condition queue. Node node = addConditionWaiter(); Long savedState = fullyRelease(node); long savedState = fullyRelease(node); int interruptMode = 0; // 4. Why is there a judgment on the waiting queue in AQS? / / answer: // If it is not in the AQS, the current thread is parched. If it is in the AQS, it exits the loop. If it is interrupted, it exits the loopwhile(! isOnSyncQueue(node)) { LockSupport.park(this);if((interruptMode = checkInterruptWhileWaiting(node)) ! = 0)break; } // 5. The thread has already been woken up by signal() or signalAll() and has exited 4whileThe loop // spin waits to try to acquire the lock again, calling the acquireQueued methodif(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! = null) unlinkCancelledWaiters();if(interruptMode ! = 0) reportInterruptAfterWait(interruptMode); }Copy the code
  1. Adds the current thread to the Condition lock queue. In particular, unlike the AQS queue, we enter the FIFO queue of Condition.

  2. Releases the lock. Here you can see that the lock is released, otherwise no other thread can get the lock and a deadlock occurs.

  3. The spin hangs until it is woken up (signal puts it back into the waiting queue of the AQS) or times out or the cat reacts, etc.

  4. AcquireQueued. And releases myself from the Condition’s FIFO queue, indicating that I no longer need the lock (I already have it).

Signal () the source code
public final void signal() {
            if(! IsHeldExclusively ()) // Throw an exception if the synchronization state is not exclusive to the current thread. As you can see from this, Condition can only be used with an exclusive class synchronization component. throw new IllegalMonitorStateException(); Node first = firstWaiter;if(first ! = null) // Notify the node at the head of the queue.doSignal(first); 
        }

private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while(! TransferForSignal (first) && // The transferForSignal method attempts to wake up the current node. If the wake up fails, the transferForSignal method tries to wake up the successor node of the current node. (first = firstWaiter) ! = null); } final Boolean transferForSignal(Node Node) {// If the status of the Node is CONDITION, the Node changes to 0 to be added to the synchronization queue. If the current status is not CONDITION, the node wait has been interrupted. This method returnsfalse.doThe Signal() method continues to try to wake up the successor nodes of the current nodeif(! compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); Int ws = p.waitStatus; int ws = p.waitStatus; If the status of the precursor is CANCELLED(>0) or the status of the precursor is set to SIGNAL fails, the thread corresponding to the current node will be awakened immediately and the acquireQueued method will be executed. This method retries to set the node's precursor state to SIGNAL and park the thread again; If the current status of the precursor node is set to SIGNAL successfully, the thread does not need to wake up immediately, but will wake up automatically when its precursor becomes the first node in the synchronization queue and releases the synchronization state. // In fact, I think it is ok not to add this judgment condition. If the thread is woken up in advance and the node enters the acquireQueued method and finds that its precursor is not the first node, it will block again and wake up again when its precursor becomes the first node and releases the lock. If this condition is added, when a thread wakes up, its precursor node must be the first node, and the thread has the opportunity to directly obtain the synchronization state, thus avoiding secondary blocking and saving hardware resources.if(ws > 0 || ! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
    }
Copy the code

Signal is used to wake up the first non-cancelled node thread in the Condition queue, while signalAll is used to wake up all non-cancelled node threads. In essence, one or all nodes are removed from the Condition queue and placed on the AQS waiting queue. Although all nodes may be awakened, it is important to remember that only one thread can acquire the lock, and the other threads that do not acquire the lock still need to spin to wait, as mentioned in Step 4 above.

Implementation Process Overview

We know that the essence of Lock is AQS. The queue maintained by AQS is the queue currently waiting for resources. AQS will wake up all nodes in the queue from the front to the back after the resource is released, so that their corresponding threads can resume execution until the queue is empty. The Condition itself maintains a queue whose purpose is to maintain a queue waiting for signal. However, the two queues have different functions, and in fact, each thread only has one of the two queues at the same time. The flow looks like this:

  1. Thread 1 callreentrantLock.lockAttempt to obtain the lock. If successful, returns fromAQSRemoves threads from the queue of Otherwise block, keep inAQSIs waiting in the queue.
  2. Thread 1 callawaitWhen a method is called, the corresponding operation is added toConditionWaiting for signal; Release the lock at the same time.
  3. When the lock is released, the head of the AQS queue wakes up, so thread 2 will acquire the lock.
  4. Thread 2 callsignalMethod, this timeConditionThread 1 has only one node in the wait queue, so it is taken out and added to the wait queue of AQS. Note that thread 1 is not awakened at this point, but is added to the AQS wait queue.
  5. signalMethod completes, thread 2 callsunLock()Method to release the lock. Since there is only thread 1 in the AQS, thread 1 is awakened and thread 1 resumes execution. So: sendsignalThe signal is just going to beConditionThe threads in the queue add toAQSIs waiting in the queue. Only to sendsignalSignal to the thread callreentrantLock.unlock()Once the lock is released, these threads will wake up.

It can be seen that the whole cooperation process is realized by the node moving back and forth between the waiting queue of AQS and the waiting queue of Condition. Condition, as a Condition class, maintains a waiting queue of signals well by itself, and adds nodes to the waiting queue of AQS at the right time to realize the wake-up operation.

Condition the nature of the wait for a notice, please refer to: www.cnblogs.com/sheeva/p/64…