preface
Development can’t help but encounter scenarios that require all child threads to complete and notify the main thread to process some logic.
Or thread A is telling thread B to do something when it reaches A condition.
You can do this in the following ways:
Wait notification mechanism
Waiting notification mode is a classic thread communication mode in Java.
Two threads communicate by calling wait() and notify() on the same object.
For example, two threads alternately print odd and even numbers:
public class TwoThreadWaitNotify {
private int start = 1;
private boolean flag = false;
public static void main(String[] args) {
TwoThreadWaitNotify twoThread = new TwoThreadWaitNotify();
Thread t1 = new Thread(new OuNum(twoThread));
t1.setName("A");
Thread t2 = new Thread(new JiNum(twoThread));
t2.setName("B");
t1.start();
t2.start();
}
/** ** even threads */
public static class OuNum implements Runnable {
private TwoThreadWaitNotify number;
public OuNum(TwoThreadWaitNotify number) {
this.number = number;
}
@Override
public void run(a) {
while (number.start <= 100) {
synchronized (TwoThreadWaitNotify.class) {
System.out.println("Even threads grabbed the lock.");
if (number.flag) {
System.out.println(Thread.currentThread().getName() + "+ - + even" + number.start);
number.start++;
number.flag = false;
TwoThreadWaitNotify.class.notify();
}else {
try {
TwoThreadWaitNotify.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
/** * odd threads */
public static class JiNum implements Runnable {
private TwoThreadWaitNotify number;
public JiNum(TwoThreadWaitNotify number) {
this.number = number;
}
@Override
public void run(a) {
while (number.start <= 100) {
synchronized (TwoThreadWaitNotify.class) {
System.out.println("Odd thread grabbed the lock.");
if(! number.flag) { System.out.println(Thread.currentThread().getName() +"+ - + odd" + number.start);
number.start++;
number.flag = true;
TwoThreadWaitNotify.class.notify();
}else {
try {
TwoThreadWaitNotify.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
}
Copy the code
Output result:
An odd 93 t1 t2 + - + + - + even t2 + 94 - + 95 odd t1 + even 96 t2 + - + + - odd t1 + - + 97 even 98 t2 + - + odd t1 + - + 99 even 100Copy the code
The thread and thread B here are of the same object TwoThreadWaitNotify. Class acquiring A lock, A thread calls the synchronization object wait () method to release the lock and entered into A state of WAITING.
Thread B calls notify() so that thread A can return from wait() after receiving the notification.
Here use TwoThreadWaitNotify. Finished communication class object.
A few things to note:
- The wait(), nofify(), and nofityAll() calls all start with an object lock (also known as an object monitor).
- After calling wait(), the thread releases the lock and enters
WAITING
State, the thread is also moved toWaiting queueIn the. - The notify() method is calledWaiting queueThe thread moves toSynchronous queue, the thread status is also updated to
BLOCKED
- The return from wait() is conditional upon the notify() thread releasing the lock and the wait() thread acquiring the lock.
Waiting for notifications has a classic paradigm:
Thread A as the consumer:
- Gets the lock of the object.
- Enter the while(judgment condition) and call wait().
- When the condition is satisfied, the loop is broken to execute the specific processing logic.
Thread B as producer:
- Get the object lock.
- Change the judgment criteria shared with thread A.
- Call notify().
The pseudocode is as follows:
//Thread A
synchronized(Object){
while{object.wait (); } / /doSomething} // synchronized(Object){condition =false; // Change the condition object.notify (); }Copy the code
The join () method
private static void join(a) throws InterruptedException {
Thread t1 = new Thread(new Runnable() {
@Override
public void run(a) {
LOGGER.info("running");
try {
Thread.sleep(3000);
} catch(InterruptedException e) { e.printStackTrace(); }}}); Thread t2 =new Thread(new Runnable() {
@Override
public void run(a) {
LOGGER.info("running2");
try {
Thread.sleep(4000);
} catch(InterruptedException e) { e.printStackTrace(); }}}); t1.start(); t2.start();Wait for thread 1 to terminate
t1.join();
Wait for thread 2 to terminate
t2.join();
LOGGER.info("main over");
}
Copy the code
Output result:
The 2018-03-16 20:21:30. 967] [Thread - 1 the INFO C.C.A ctual. ThreadCommunication - running2 20:21:30 2018-03-16. 967 [Thread - 0] The INFO C.C.A ctual. ThreadCommunication - running the 2018-03-16 20:21:34. [the main] 972 INFO C.C.A ctual. ThreadCommunication - main overCopy the code
Join () blocks until T1 completes, so eventually the main thread waits for T1 and T2 to complete.
Join () also uses wait notification:
Core logic:
while (isAlive()) {
wait(0);
}
Copy the code
The notifyAll() method is called after the join thread completes, which is called in the JVM implementation, so it is not visible here.
Volatile shared memory
Since Java uses shared memory for thread communication, thread A can be shut down with the main thread as follows:
public class Volatile implements Runnable{
private static volatile boolean flag = true ;
@Override
public void run(a) {
while (flag){
System.out.println(Thread.currentThread().getName() + "Running...");
}
System.out.println(Thread.currentThread().getName() +"Executed");
}
public static void main(String[] args) throws InterruptedException {
Volatile aVolatile = new Volatile();
new Thread(aVolatile,"thread A").start();
System.out.println("Main thread running"); TimeUnit.MILLISECONDS.sleep(100); aVolatile.stopThread(); }private void stopThread(a){
flag = false; }}Copy the code
Output result:
Thread A is running... Thread A is running... Thread A is running... Thread A is running... Thread A completesCopy the code
The flag is stored in main memory, so both the main thread and thread A can see it.
Flag is volatile primarily for memory visibility, more of which can be found here.
CountDownLatch concurrency tool
CountDownLatch implements the same functions as Join, but with more flexibility.
private static void countDownLatch(a) throws Exception{
int thread = 3 ;
long start = System.currentTimeMillis();
final CountDownLatch countDown = new CountDownLatch(thread);
for (int i= 0; i<thread ; i++){new Thread(new Runnable() {
@Override
public void run(a) {
LOGGER.info("thread run");
try {
Thread.sleep(2000);
countDown.countDown();
LOGGER.info("thread end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
countDown.await();
long stop = System.currentTimeMillis();
LOGGER.info("main over total time={}",stop-start);
}
Copy the code
Output result:
The 2018-03-16 20:19:44. 126 [Thread - 0] INFO C.C.A ctual. ThreadCommunication - Thread run the 2018-03-16 20:19:44. 126 / Thread - 2 INFO C.C.A ctual. ThreadCommunication - thread run 20:19:44. 2018-03-16, 126] [thread - 1 the INFO C.C.A ctual. ThreadCommunication [thread - the thread run 20:19:46 2018-03-16. 136-2] INFO C.C.A ctual. ThreadCommunication - thread end 20:19:46 2018-03-16. 136 [Thread 1] INFO C.C.A ctual. ThreadCommunication - Thread end 2018-03-16 20:19:46. 136 / Thread - 0 INFO C.C.A ctual. 20:19:46 ThreadCommunication thread end - 2018-03-16. 136. [the main] INFO C.C.A ctual. ThreadCommunication - main over total time=2012Copy the code
CountDownLatch is also based on AQS (AbstractQueuedSynchronizer), implemented more reference already principle
- The concurrent threads are told when a CountDownLatch is initialized, and the countDown() method is called after each thread completes processing.
- This method takes a built-in AQS state of -1.
- Finally, the await() method is called on the main thread, which blocks until
state == 0
Is returned.
CyclicBarrier concurrency tool
private static void cyclicBarrier(a) throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);new Thread(new Runnable() {
@Override
public void run(a) {
LOGGER.info("thread run");
try {
cyclicBarrier.await() ;
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info("thread end do something");
}
}).start();
new Thread(new Runnable() {
@Override
public void run(a) {
LOGGER.info("thread run");
try {
cyclicBarrier.await() ;
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info("thread end do something");
}
}).start();
new Thread(new Runnable() {
@Override
public void run(a) {
LOGGER.info("thread run");
try {
Thread.sleep(5000);
cyclicBarrier.await() ;
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info("thread end do something");
}
}).start();
LOGGER.info("main thread");
}
Copy the code
Cyclicbarriers can also be used for communication between threads.
It can wait for N threads to reach a certain state and continue running.
- The thread participant is first initialized.
- call
await()
Will wait until all participant threads have been called. - Until all participants are called
await()
After all threads fromawait()
Return to continue with subsequent logic.
Running results:
The 2018-03-18 22:40:00. 731 [Thread - 0] INFO C.C.A ctual. ThreadCommunication - Thread run the 2018-03-18 22:40:00. 731] [Thread - 1 INFO C.C.A ctual. ThreadCommunication - thread run 22:40:00. 2018-03-18, 731 [thread - 2] INFO C.C.A ctual. ThreadCommunication 2018-03-18 - thread run 22:40:00. 731. [the main] INFO C.C.A ctual. ThreadCommunication - the main thread 22:40:05. 2018-03-18 741 [Thread-0] INFO c.c.actual.ThreadCommunication - thread enddoSomething the 2018-03-18 22:40:05. 741] [Thread - 1 the INFO C.C.A ctual. ThreadCommunication - Thread enddoSomething the 2018-03-18 22:40:05. 741 / Thread - 2 INFO C.C.A ctual. ThreadCommunication - Thread enddo something
Copy the code
You can see that since one of the threads is asleep for five seconds, all the other threads have to wait for this thread to call await().
This tool provides the same functionality as CountDownLatch, but with more flexibility. You can even call the reset() method to reset the CyclicBarrier (by catching the BrokenBarrierException handler yourself) and then re-execute it.
Thread response interrupt
public class StopThread implements Runnable {
@Override
public void run(a) {
while ( !Thread.currentThread().isInterrupted()) {
// The thread executes concrete logic
System.out.println(Thread.currentThread().getName() + "Running...");
}
System.out.println(Thread.currentThread().getName() + "Quit...");
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new StopThread(), "thread A");
thread.start();
System.out.println("Main thread running"); TimeUnit.MILLISECONDS.sleep(10); thread.interrupt(); }}Copy the code
Output result:
Thread A is running. Thread A is running. Thread A exits.Copy the code
You can communicate by interrupting the thread. Calling thread.interrupt() simply sets one of the flag properties in the thread to true.
This is not to say that calling this method will interrupt the thread, and there is no point in not responding to this flag (which is checked here).
However, if InterruptedException is thrown, the flag is reset to false by the JVM.
The thread pool awaitTermination() method
If you use a thread pool to manage threads, you can use the following method to make the main thread wait for all tasks in the thread pool to complete:
private static void executorService(a) throws Exception{
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(10); ThreadPoolExecutor poolExecutor =new ThreadPoolExecutor(5.5.1, TimeUnit.MILLISECONDS,queue) ;
poolExecutor.execute(new Runnable() {
@Override
public void run(a) {
LOGGER.info("running");
try {
Thread.sleep(3000);
} catch(InterruptedException e) { e.printStackTrace(); }}}); poolExecutor.execute(new Runnable() {
@Override
public void run(a) {
LOGGER.info("running2");
try {
Thread.sleep(2000);
} catch(InterruptedException e) { e.printStackTrace(); }}}); poolExecutor.shutdown();while(! poolExecutor.awaitTermination(1,TimeUnit.SECONDS)){
LOGGER.info("Thread still executing...");
}
LOGGER.info("main over");
}
Copy the code
Output result:
The 2018-03-16 20:18:01. 273 / - thread pool - 1-2 INFO C.C.A ctual. ThreadCommunication - running2 20:18:01 2018-03-16. 273 [] - thread pool - 1-1 the INFO C.C.A ctual. ThreadCommunication - running the 2018-03-16 20:18:02. [the main] 273 INFO C.C.A ctual. ThreadCommunication - thread still perform... The 2018-03-16 20:18:03. 278. [the main] INFO C.C.A ctual. ThreadCommunication - thread still perform... The 2018-03-16 20:18:04. 278. [the main] INFO C.C.A ctual. ThreadCommunication - the main overCopy the code
The prerequisite for using the awaitTermination() method is to close the thread pool, as when the shutdown() method is called.
After shutdown() is called, the thread pool stops accepting new tasks and smoothly closes existing tasks in the pool.
Pipeline communication
public static void piped(a) throws IOException {
// Character oriented PipedInputStream byte oriented
PipedWriter writer = new PipedWriter();
PipedReader reader = new PipedReader();
// The input and output streams are connected
writer.connect(reader);
Thread t1 = new Thread(new Runnable() {
@Override
public void run(a) {
LOGGER.info("running");
try {
for (int i = 0; i < 10; i++) {
writer.write(i+"");
Thread.sleep(10); }}catch (Exception e) {
} finally {
try {
writer.close();
} catch(IOException e) { e.printStackTrace(); }}}}); Thread t2 =new Thread(new Runnable() {
@Override
public void run(a) {
LOGGER.info("running2");
int msg = 0;
try {
while((msg = reader.read()) ! = -1) {
LOGGER.info("msg={}", (char) msg); }}catch (Exception e) {
}
}
});
t1.start();
t2.start();
}
Copy the code
Output result:
The 2018-03-16 19:56:43. 014 [Thread - 0] INFO C.C.A ctual. ThreadCommunication - running the 2018-03-16 19:56:43. 014 INFO [Thread - 1] C.C.A ctual. ThreadCommunication - running2 19:56:43 2018-03-16. 130] [Thread - 1 the INFO C.C.A ctual. ThreadCommunication - MSG = 0 2018-03-16 19:56:43. [132] Thread - 1 the INFO C.C.A ctual. ThreadCommunication MSG = 1-2018-03-16 19:56:43. 132] [Thread - 1 INFO C.C.A ctual. ThreadCommunication - MSG = 2 19:56:43. 2018-03-16, 133] [Thread - 1 the INFO C.C.A ctual. ThreadCommunication - MSG = 3 2018-03-16 19:56:43. 133] [Thread - 1 the INFO C.C.A ctual. ThreadCommunication - MSG = 4 2018-03-16 19:56:43. 133] [Thread - 1 INFO C.C.A ctual. ThreadCommunication - MSG = 5 19:56:43. 2018-03-16, 133] [Thread - 1 the INFO C.C.A ctual. ThreadCommunication - MSG = 6 2018-03-16 19:56:43. 134] [Thread - 1 the INFO C.C.A ctual. ThreadCommunication - MSG = 7 2018-03-16 19:56:43. 134] [Thread - 1 INFO C.C.A ctual. ThreadCommunication - MSG = 8 19:56:43. 2018-03-16, 134] [Thread - 1 the INFO C.C.A ctual. ThreadCommunication - msg=9Copy the code
Although Java is based on in-memory communication, it can also use pipe communication.
Note that the input and output streams need to be connected first. So thread B can receive the message sent by thread A.
In practical development, the most suitable thread communication mode can be flexibly selected according to the requirements.
extra
Recently in the summary of some Java related knowledge points, interested friends can maintain together.
Address: github.com/crossoverJi…