When you use multiple threads to run multiple tasks at the same time, you can synchronize the behavior of two tasks by using locks so that one task does not interfere with the other task’s resources. That is, if two tasks step into a shared resource alternately, you can use mutual exclusion to ensure that only one task can access the resource at any one time.

Collaboration between threads

Now that the above problem is solved, the next step is to make tasks work cooperatively with each other so that multiple tasks can work together to solve a problem. The problem now is not mutual interference, but mutual cooperation. The key to solving this kind of problem is that some parts must be solved before others can be solved.

When tasks collaborate, the key issue is the handshake between those tasks. To achieve the handshake, we use the same basic feature: mutex. In this case, mutex ensures that only one task can respond to a signal, thus eliminating any possible race conditions. On mutex, we have added a way for the task to suspend itself until some external condition changes indicating that it is time for the task to begin.

Wait () and notifyAll ()

Wait () allows you to wait for a condition to change, usually by another task. You don’t want to be running an empty loop while your task tests this condition, which is called busy waiting and is bad CPU usage. So wait() suspends the task when the external condition changes, and only when notif() or notifAll() occurs does the task wake up to check for the change. Therefore, wait() provides a way to synchronize activities between tasks.

The lock is not released when sleep() is called, nor when yield() is called. When a task encounters a call to wait() in a method, thread execution is suspended and the lock on the object is released. This means that another task can acquire the lock, so other synchronized methods on the modified object can be called during wait(). Thus, when you call wait(), you’re declaring: “I’ve done everything, but I want other synchronized actions to be performed if the condition is when.”

There are two forms of wait():

  • The first takes milliseconds: the time to pause again.
    • The object lock is released during wait().
    • Execution can be resumed from wait() by notif() or notifAll(), or by instruction expiration.
  • The second type of wait() takes no arguments.
    • This wait() wirelessly waits until the thread receives a notif() or notifAll().

One special aspect of wait(), notif(), and notifAll() is that they are part of the base Object class, not Thread. What was just a thread became part of a common base class. The reason is that the locks that these methods operate on are also part of all objects. So you can put wait() in any synchronization control method, regardless of whether the class inherits from Thread or Runnable. In practice, wait(), notif(), or notifAll() can only be called in synchronized methods or blocks of synchronized code. If the operation of these methods in the synchronized code block, the program can compile, but get IllegalMonitorStateException anomalies at run time. This means that you must have a lock on the object before calling wait(), notif(), or notifAll().

For example, if you send notifAll() to object X, you must do so in the synchronized control block that gets the lock on x:

synchronized(x){
  x.notifAll();
}
Copy the code

Let’s look at an example: one is to apply wax to Car, and one is to polish it. The polishing task cannot perform its work until the waxing task is complete, and the waxing task must wait for the polishing task to complete before applying another layer of wax.

public class Car {
  // The waxed and polished state
	private boolean waxOn = false;
	/ / wax
	public synchronized void waxed(a) {
		waxOn = true;
		notifyAll();
	}

	/ / polishing
	public synchronized void buffed(a) {
		waxOn = false;
		notifyAll();
	}

	// Finish polishing is suspended and the waxing task is about to begin
	public synchronized void waitForWaxing(a) throws InterruptedException{
		while (waxOn == false) { wait(); }}// The waxing end is suspended and is about to start the casting task
	public synchronized void waitForBuffing(a) throws InterruptedException{
		while (waxOn == true) { wait(); }}}Copy the code

Start the waxing task:

public class WaxOn implements Runnable{
	private Car car;

	protected WaxOn(Car car) {
		super(a);this.car = car;
	}


	@Override
	public void run(a) {
		try {
			while(! Thread.interrupted()) { System.out.println("Wax one");
				TimeUnit.MICROSECONDS.sleep(200);
				// Start waxing
				car.waxed();
				// The current task is suspendedcar.waitForBuffing(); }}catch (InterruptedException e) {
			// TODO Auto-generated catch block
			System.out.println(" Exiting via interrupt");
		}
		System.out.println("Ending wax on task"); }}Copy the code

Start the polishing task:

public class WaxOff implements Runnable{
	private Car car;

	protected WaxOff(Car car) {
		super(a);this.car = car;
	}

	@Override
	public void run(a) {
		// TODO Auto-generated method stub
		try {
			while(! Thread.interrupted()) {// If it is still waxing, hang it
				car.waitForWaxing();
				System.out.println("Wax off");
				TimeUnit.MICROSECONDS.sleep(200);
				// Start polishingcar.buffed(); }}catch (InterruptedException e) {
			// TODO Auto-generated catch block
			System.out.println("Wxtiing via interrupt");
		}
		System.out.println("Ending wax off task"); }}Copy the code

The test class:

public class WaxOmatic {

	public static void main(String[] args) throws Exception{
		// TODO Auto-generated method stub
		Car car = new Car();
		ExecutorService service = Executors.newCachedThreadPool();
		service.execute(new WaxOff(car));
		service.execute(new WaxOn(car));
		// Pause for 2 seconds
		TimeUnit.SECONDS.sleep(1);
		// Close all tasksservice.shutdownNow(); }}Copy the code

Execution Result:

/... Wax one Wax off Wax one Wax off Wax one Wax off Exiting via interrupt Wxtiing via interrupt Ending wax on task Ending wax off taskCopy the code

Check the WaxOn flag in waitForWaxing(), and if it is false, the call task will be suspended. It is important that this behavior occurs in synchronized methods. Because in this method the task has already acquired the lock. When you call wait(), the thread is suspended and the lock is released. Releasing the lock is essential because in order to safely change the state of the object, some other task must be able to acquire the lock.

Waxon.run () represents the first step in waxing the car, and it does what it does: call sleep() to simulate the waxing time, then tell the car that the waxing is over, and call waitForWaxing(), which calls wait() to suspend the current waxing task. Until the WaxOff task calls buffed() on both vehicles, changing the state and calling notfiAll() to wake them up again. Flip it over, too, and when you run the program, you see control passing back and forth between the two tasks, the two-step process repeating itself.

Missed signals

When two threads collaborate using notif()/wait() or notifAll()/wait(), it is possible to miss a signal. Assume that thread T1 is the notifying thread T2, and both threads are implemented as follows:

T1:
synchronized(X){
  // Set a condition for T2
  <setup condition for T2>
  x.notif();
}

T2:
while(someCondition){
  //Potit
  synchronized(x){ x.wait(); }}Copy the code

The example above assumes that T2 finds it true() for someCondition. The thread scheduler may switch to T1 during Potit execution. T1 will perform a condition reset and call wake up. When T2 continues, it does not realize that the condition has changed and therefore blindly enters wait(). Now wake has already been called before, and T2 will wait indefinitely for the wake signal.

The solution to this problem is to prevent race conditions on the someCondition variable:

synchronized(x){
  while(someCondition){ x.wait(); }}Copy the code

With notifAll notif () ()

There can be multiple tasks suspended in wait() on a single Car object, so calling notifyAll() is safer than calling notify(). Using notify() instead of notifyAll() is an optimization. With notify(), only one of the many tasks waiting for the same lock is woken up, so if you want to use notify(), you must ensure that the appropriate task is woken up. Also, with notify(), all tasks must wait for the same condition, because if you have multiple tasks waiting for different conditions, you won’t know if you’ve woken up the right task. If notfiy() is used, only one task must benefit from a condition change. Finally, these restrictions must always apply to all possible subclasses. NotifyAll () must be used if any of these rules are not met.

In Java’s threading mechanism, a description goes like this: notifyAll() wakes up all waiting tasks. Does this mean that any task in wait() state will be awakened by any call to notifyAll() anywhere in the program? This is not the case as shown in the following example, when notifyAll() is called for a particular lock, only the task waiting for the lock is awakened:

public class Blocker {
	 synchronized void waitingCall(a) {
	    try {
	      while(! Thread.interrupted()) { wait(); System.out.print(Thread.currentThread() +""); }}catch(InterruptedException e) {
	      // OK to exit this way}}synchronized void prod(a) {
		  notify();
	  }

	  synchronized void prodAll(a) { notifyAll(); }}Copy the code

Create a Task Task

public class Task implements Runnable {
	  static Blocker blocker = new Blocker();
	  public void run(a) { blocker.waitingCall(); }}Copy the code

Create task Task2:

public class Task2 implements Runnable {
	  // A separate Blocker object:
	  static Blocker blocker = new Blocker();
	  public void run(a) { blocker.waitingCall(); }}Copy the code

The test class:

public class NotifyVsNotifyAll {
	public static void main(String[] args) throws Exception{
		ExecutorService service = Executors.newCachedThreadPool();
		for (int i = 0; i < 3; i++) {
			service.execute(new Task());
		}
			service.execute(new Task2());
			Timer timer = new Timer();
			timer.scheduleAtFixedRate(new TimerTask() {
				boolean prod = true;
				@Override
				public void run(a) {
					// TODO Auto-generated method stub
					if (prod) {
						System.out.println("notify");
						Task.blocker.prod();
						prod = false;
					}else {
						System.out.println("notifyAll");
						Task.blocker.prodAll();
						prod = true; }}},400.400);
			TimeUnit.SECONDS.sleep(5);
			timer.cancel();
			System.out.println("Time cancle");
			TimeUnit.MILLISECONDS.sleep(500);
		    System.out.println("Task2.blocker.prodAll() ");
		    Task2.blocker.prodAll();
		    TimeUnit.MILLISECONDS.sleep(500);
		    System.out.println("\nShutting down");
		    service.shutdownNow(); // Interrupt all tasks}}Copy the code

Test results:

Notify Thread[pool-1-thread-2,5,main] notifyAll Thread[pool-1-thread-2,5,main] notifyAll Thread[pool-1-thread-3,5,main] NotifyAll Thread[pool-1-thread-1,5,main] Notify Thread[pool-1-thread-2,5,main] notifyAll Thread[pool-1-thread-2,5,main] NotifyAll Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-3,5,main] notify Thread[pool-1-thread-2,5,main] notifyAll Thread[pool-1-thread-2,5,main] notifyAll Thread [- Thread pool - 1-2, 5, the main] Thread [- Thread pool - 1-3, 5, the main] Thread [- Thread pool - 1-1, 5, the main] notify Thread [- Thread pool - 1-2, 5, the main] notifyAll Thread [- Thread pool - 1-2, 5, the main] Thread [- Thread pool - 1-1, 5, the main] NotifyAll Thread[pool-1-thread-3,5,main] Notify Thread[pool-1-thread-2,5,main] notifyAll Thread[pool-1-thread-2,5,main] notifyAll Thread[pool-1-thread-3,5,main] NotifyAll Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-1,5,main] notify Thread[pool-1-thread-2,5,main] notifyAll Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-3,5,main] Time cancle Task2. Blocker. ProdAll () Thread [- Thread pool - 1-4, 5, the main] Shutting downCopy the code

As you can see from the output above, we started three Task threads, one Task2 thread. Using timer, you make a timer that starts the notify() and notifyAll() methods of Task.Blocker alternately every 4 milliseconds. We can see that both Task and Task2 have Blocker objects that block when they call the Blocker. We see that when task.prod () is called only one of the pending tasks wakes up, and the other two continue to hang. All three waiting threads are woken up when task.prodall () is called. When Task2 is called. On prodAll(), only the Task2 thread tasks are awakened. The remaining three Task tasks continue to hang.

Producers and consumers

Consider the situation where there is a cook and a waiter in a restaurant. The waiter had to wait for the cook to prepare the meal. When the chef is ready, he calls the waiter, who serves the food and returns to wait. This is an example of task collaboration: the chef represents the producer and the waiter represents the consumer. The two tasks must shake hands as the meal is produced and consumed, and the system must be shut down in an orderly manner.

Meal types:

public class Meal {
	private final int orderNum;
	public Meal(int orderNum) { this.orderNum = orderNum; }
    public String toString(a) { return "Meal "+ orderNum; }}Copy the code

Waiter:

public class WaitPerson implements Runnable {
	  private Restaurant restaurant;
	  public WaitPerson(Restaurant r) {
		  restaurant = r;
	  }

	  public void run(a) {
	    try {
	      while(! Thread.interrupted()) {synchronized(this) {
	          while(restaurant.meal == null)
	            wait(); // ... for the chef to produce a meal
	        }
	        Print.print("Waitperson got " + restaurant.meal);
	        synchronized(restaurant.chef) {
	          restaurant.meal = null;
	          restaurant.chef.notifyAll(); // Ready for another}}}catch(InterruptedException e) {
	    	Print.print("WaitPerson interrupted"); }}}Copy the code

Cook class:

public class Chef implements Runnable {
	  private Restaurant restaurant;
	  private int count = 0;
	  public Chef(Restaurant r) {
		  restaurant = r;
	  }
	  public void run(a) {
	    try {
	      while(! Thread.interrupted()) {synchronized(this) {
	          while(restaurant.meal ! =null)
	            wait(); // ... for the meal to be taken
	        }
	        if(++count == 10) {
	        	Print.print("Out of food, closing");
	          restaurant.exec.shutdownNow();
	        }
	        Print.printnb("Order up! ");
	        synchronized(restaurant.waitPerson) {
	          restaurant.meal = new Meal(count);
	          restaurant.waitPerson.notifyAll();
	        }
	        TimeUnit.MILLISECONDS.sleep(100); }}catch(InterruptedException e) {
	    	Print.print("Chef interrupted"); }}}Copy the code

The test class:

public class Restaurant {
	  Meal meal;
	  ExecutorService exec = Executors.newCachedThreadPool();
	  WaitPerson waitPerson = new WaitPerson(this);
	  Chef chef = new Chef(this);
	  public Restaurant() {
	    exec.execute(chef);
	    exec.execute(waitPerson); } public static void main(String[] args) { new Restaurant(); }}Copy the code

Execution Result:

Order up! Waitperson got Meal 1
Order up! Waitperson got Meal 2
Order up! Waitperson got Meal 3
Order up! Waitperson got Meal 4
Order up! Waitperson got Meal 5
Order up! Waitperson got Meal 6
Order up! Waitperson got Meal 7
Order up! Waitperson got Meal 8
Order up! Waitperson got Meal 9
Out of food, closing
Order up! WaitPerson interrupted
Chef interrupted

Copy the code

** Uses the displayed Lock and Condition objects

There are additional display tools in the Java SE5 class library. Let’s rewrite our waxing and polishing class. The basic class that uses mutex and allows tasks to hang is Condition, and you can suspend a task by calling await() on Condition. When an external Condition changes, meaning that a task should continue, you can wake up a task by calling signal() to notify the task, or by calling signalAll() to wake up all pending tasks on the Condition. (signalAll() is a safer way than notifAll())

Here is the rewritten version:

class Car {
  private Lock lock = new ReentrantLock();
  private Condition condition = lock.newCondition();
  private boolean waxOn = false;
  public void waxed(a) {
    lock.lock();
    try {
      waxOn = true; // Ready to buff
      condition.signalAll();
    } finally{ lock.unlock(); }}public void buffed(a) {
    lock.lock();
    try {
      waxOn = false; // Ready for another coat of wax
      condition.signalAll();
    } finally{ lock.unlock(); }}public void waitForWaxing(a) throws InterruptedException {
    lock.lock();
    try {
      while(waxOn == false)
        condition.await();
    } finally{ lock.unlock(); }}public void waitForBuffing(a) throws InterruptedException{
    lock.lock();
    try {
      while(waxOn == true)
        condition.await();
    } finally{ lock.unlock(); }}}class WaxOn implements Runnable {
  private Car car;
  public WaxOn(Car c) { car = c; }
  public void run(a) {
    try {
      while(! Thread.interrupted()) { printnb("Wax On! ");
        TimeUnit.MILLISECONDS.sleep(200); car.waxed(); car.waitForBuffing(); }}catch(InterruptedException e) {
      print("Exiting via interrupt");
    }
    print("Ending Wax On task"); }}class WaxOff implements Runnable {
  private Car car;
  public WaxOff(Car c) { car = c; }
  public void run(a) {
    try {
      while(! Thread.interrupted()) { car.waitForWaxing(); printnb("Wax Off! ");
        TimeUnit.MILLISECONDS.sleep(200); car.buffed(); }}catch(InterruptedException e) {
      print("Exiting via interrupt");
    }
    print("Ending Wax Off task"); }}public class WaxOMatic2 {
  public static void main(String[] args) throws Exception {
    Car car = new Car();
    ExecutorService exec = Executors.newCachedThreadPool();
    exec.execute(new WaxOff(car));
    exec.execute(new WaxOn(car));
    TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); }}Copy the code

A single Lock in the Car constructor produces a Condition object, which is used to manage communication between tasks. But the Condition does not contain any information about the processing state, so you need additional information about the processing state, Boolean waxOn.

Producer consumers and queues

The wait() and notifAll() methods solve the problem of task interoperation in a very low-level way, by shaking hands on every interaction. Many times we can solve the problem of collaboration by using synchronous queues, which allow only one task to insert or remove elements at any one time. In Java. Util. Concurrent. BlockingQueue interface provides the queue, the interface has a lot of standard implementation. You can use LinkedBlockingQueue, which is an unbounded queue, or ArrayBlockingQueue, which has a fixed size and can put a finite number of elements into it before it blocks.

If a consumer task tries to fetch an object from the queue and the queue is empty, the queues can suspend the task and resume the consuming task when more elements are available. Blocking queues can solve very large problems in a simple and reliable way compared to wait() and notifyAll().

Here is a simple test that serializes the execution of multiple LiftOff objects. The consumer LiftOffRunner pushes each LiftOff object out of the BlockIngQueue and runs it directly. Instead of starting a thread for each task, it runs with its own thread through the displayed call to run().

First, post the LiftOff class you wrote earlier:

public class LiftOff implements Runnable{
	  protected int countDown = 10; // Default
	  private static int taskCount = 0;
	  private final int id = taskCount++;
	  public LiftOff(a) {}
	  public LiftOff(int countDown) {
	    this.countDown = countDown;
	  }
	  public String status(a) {
	    return "#" + id + "(" +
	      (countDown > 0 ? countDown : "Liftoff!") + "),";
	  }
	  public void run(a) {
	    while(countDown-- > 0) { System.out.print(status()); Thread.yield(); }}}Copy the code

LiftOffRunner class:

public class LiftOffRunner implements Runnable{
	private BlockingQueue<LiftOff> rockets;

	protected LiftOffRunner(BlockingQueue<LiftOff> rockets) {
		super(a);this.rockets = rockets;
	}
    public void add(LiftOff lo) {
		try {
			rockets.put(lo);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			System.out.println("Add failed"); }}@Override
	public void run(a) {
		// TODO Auto-generated method stub
		try {
			while (!Thread.interrupted()) {
				LiftOff rocket = rockets.take();
				rocket.run();
			}
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			System.out.println("Operation interruption");
		}
		System.out.println("Exit run"); }}Copy the code

Finally, the test classes:

public class TestBlockingQueues {
	static void getkey(a){
		try {
			new BufferedReader(new InputStreamReader(System.in)).readLine();
		} catch (IOException e) {
			// TODO Auto-generated catch blocke.printStackTrace(); }}static void getkey(String message) {
	    Print.print(message);
	    getkey();
	  }

	static void test(String msg,BlockingQueue<LiftOff> queue){
		LiftOffRunner runner = new LiftOffRunner(queue);
		Thread thread = new Thread(runner);
		thread.start();
		// Start, but the content is empty, so it is suspended, waiting for new content to enter
		for (int i = 0; i < 5; i++) {
			runner.add(new LiftOff(5));
		}
		getkey("Press Enter "+ msg);
		thread.interrupt();

	}

	public static void main(String[] args) {
		test("LinkedBlockingQueue".new LinkedBlockingQueue<LiftOff>());
		test("ArrayBlockingQueue".new ArrayBlockingQueue<>(3));
		test("SynchronousQueue".newSynchronousQueue<>()); }}Copy the code

Toast BlockingQueue

Here’s an example where each machine has three tasks: one to make only toast, one to butter toast, and one to spread jam on buttered toast. If you use BlockIngQueue to run this example:

class Toast {
  public enum Status { DRY, BUTTERED, JAMMED }
  private Status status = Status.DRY;
  private final int id;
  public Toast(int idn) { id = idn; }
  public void butter(a) { status = Status.BUTTERED; }
  public void jam(a) { status = Status.JAMMED; }
  public Status getStatus(a) { return status; }
  public int getId(a) { return id; }
  public String toString(a) {
    return "Toast " + id + ":"+ status; }}class ToastQueue extends LinkedBlockingQueue<Toast> {}

class Toaster implements Runnable {
  private ToastQueue toastQueue;
  private int count = 0;
  private Random rand = new Random(47);
  public Toaster(ToastQueue tq) { toastQueue = tq; }
  public void run(a) {
    try {
      while(! Thread.interrupted()) { TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500));
        // Make toast
        Toast t = new Toast(count++);
        print(t);
        // Insert into queuetoastQueue.put(t); }}catch(InterruptedException e) {
      print("Toaster interrupted");
    }
    print("Toaster off"); }}// Apply butter to toast:
class Butterer implements Runnable {
  private ToastQueue dryQueue, butteredQueue;
  public Butterer(ToastQueue dry, ToastQueue buttered) {
    dryQueue = dry;
    butteredQueue = buttered;
  }
  public void run(a) {
    try {
      while(! Thread.interrupted()) {// Blocks until next piece of toast is available:Toast t = dryQueue.take(); t.butter(); print(t); butteredQueue.put(t); }}catch(InterruptedException e) {
      print("Butterer interrupted");
    }
    print("Butterer off"); }}// Apply jam to buttered toast:
class Jammer implements Runnable {
  private ToastQueue butteredQueue, finishedQueue;
  public Jammer(ToastQueue buttered, ToastQueue finished) {
    butteredQueue = buttered;
    finishedQueue = finished;
  }
  public void run(a) {
    try {
      while(! Thread.interrupted()) {// Blocks until next piece of toast is available:Toast t = butteredQueue.take(); t.jam(); print(t); finishedQueue.put(t); }}catch(InterruptedException e) {
      print("Jammer interrupted");
    }
    print("Jammer off"); }}// Consume the toast:
class Eater implements Runnable {
  private ToastQueue finishedQueue;
  private int counter = 0;
  public Eater(ToastQueue finished) {
    finishedQueue = finished;
  }
  public void run(a) {
    try {
      while(! Thread.interrupted()) {// Blocks until next piece of toast is available:
        Toast t = finishedQueue.take();
        // Verify that the toast is coming in order,
        // and that all pieces are getting jammed:
        if(t.getId() ! = counter++ || t.getStatus() ! = Toast.Status.JAMMED) { print(">>>> Error: " + t);
          System.exit(1);
        } else
          print("Chomp! "+ t); }}catch(InterruptedException e) {
      print("Eater interrupted");
    }
    print("Eater off"); }}public class ToastOMatic {
  public static void main(String[] args) throws Exception {
    ToastQueue dryQueue = new ToastQueue(),
               butteredQueue = new ToastQueue(),
               finishedQueue = new ToastQueue();
    ExecutorService exec = Executors.newCachedThreadPool();
    exec.execute(new Toaster(dryQueue));
    exec.execute(new Butterer(dryQueue, butteredQueue));
    exec.execute(new Jammer(butteredQueue, finishedQueue));
    exec.execute(new Eater(finishedQueue));
    TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); }}Copy the code

There is no synchronization shown in this example, because the design of the synchronization queue and system implicitly manages that only one task is active on each Toast at any one time. The process will be suspended and resumed automatically because the queue is blocked.

My blog

My wechat official number: Android development boast

Scan Attention is a free set of Java programming thought notes