This is the 25th day of my participation in the August Genwen Challenge.More challenges in August
CAS(Lock-free optimization, spin)
package com.anzhi.CAS;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class Atomic1 {
// Integer count = 0;
AtomicInteger count = new AtomicInteger(0);
public synchronized void test(a){
for(int i=0; i<10000; i++) count.incrementAndGet(); // the equivalent of count++
}
public static void main(String[] args) {
Atomic1 t = new Atomic1();
List<Thread> list = new ArrayList<>();
for(int i=0; i<10; i++){
list.add(new Thread(t::test, "t-"+i));
}
for (Thread thread : list) {
thread.start();
}
for(Thread thread : list){
try {
thread.join();
} catch(InterruptedException e) { e.printStackTrace(); } } System.out.println(t.count); }}Copy the code
The principle of
JDK1.8 Unsafe class
while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
Copy the code
Compare and Swap
cas(V, Expected, NewValue)
if V==E
V=New;
otherwise try again or fail // If the expected value is not equal, try again. The expected value is given
Copy the code
CAS is CPU primitive support
ABA issues (base types will not be a problem, references will be a problem)
There exists a process in which we set the expectation of a value to 1, change it to 2 once in the middle operation, and finally change it to 1 again. The process of 2 is ignored and cannot be identified. If you want to record each procedure, add the version number.
Zhihu saw this quote:
Thread 1 (ATM) : get current value 100, expect to update to 50, thread 2 (ATM) : Get current value 100, expect update to 50, thread 1 executes successfully, thread 2 blocks for some reason, then someone sends Xiaoming 50 thread 3 (default) : Thread 3 returns from the Block with a balance of 100. Thread 2 returns from the Block with a balance of 100. Thread 2 returns from the Block with a balance of 100. At this point, you can see that the actual balance should be 100 (100-50+50), but it is actually 50 (100-50+50-50). This is the success caused by the ABA problem. Source: https://www.zhihu.com/question/23281499/answer/119172750 zhihu copyright owned by the author. Commercial reprint please contact the author for authorization, non-commercial reprint please indicate the source.Copy the code
Efficiency tests: Atomic, Synchronized, LongAdder
package com.anzhi.CAS;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
public class Atomic2 {
// Integer count = 0;
static long count1 = 0L;
static AtomicInteger count2 = new AtomicInteger(0);
static LongAdder count3 = new LongAdder();
public static void main(String[] args) {
Thread[] threads = new Thread[1000];
System.out.println("-----------------------Atomic------------------------");
for(int i=0; i<threads.length; i++){
threads[i] = new Thread(()->{
for(int j = 0; j<100000; j++){ count2.incrementAndGet(); }}); }long start = System.currentTimeMillis();
for(Thread thread : threads){
thread.start();
}
for(Thread thread : threads){
try {
thread.join();
} catch(InterruptedException e) { e.printStackTrace(); }}long end = System.currentTimeMillis();
System.out.println("Atomic:"+count2+"Time."+(end-start));
System.out.println("-----------------------synchronized------------------------");
Object o = new Object();
for(int i=0; i<threads.length; i++){
threads[i] = new Thread(new Runnable() {
@Override
public void run(a) {
for(int j=0; j<100000; j++) {
synchronized(o) { count1++; }}}}); }long start1 = System.currentTimeMillis();
for(Thread thread : threads){
thread.start();
}
for(Thread thread : threads){
try {
thread.join();
} catch(InterruptedException e) { e.printStackTrace(); }}long end1 = System.currentTimeMillis();
System.out.println("synchronized:"+count2+"Time."+(end1-start1));
System.out.println("-------------------LongAdder----------------------------");
for(int i=0; i<threads.length; i++){
threads[i] = new Thread(()->{
for(int j = 0; j<100000; j++){ count3.increment(); }}); }long start2 = System.currentTimeMillis();
for(Thread thread : threads){
thread.start();
}
for(Thread thread : threads){
try {
thread.join();
} catch(InterruptedException e) { e.printStackTrace(); }}long end2 = System.currentTimeMillis();
System.out.println("LongAdder:"+count3+"time:"+(end2-start2)); }}Copy the code
LongAdder is efficient because it breaks up the task and locks it up.
New type of lock
Reentrantlock ReentrantLock
ReentrantLock is an exclusive lock. The process of adding and unlocking a ReentrantLock is manual, difficult to operate, but flexible.
Synchronized is an exclusive lock. The process of locking and unlocking is automatic and easy to operate, but not flexible enough.
package com.anzhi.Reentrantlock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Reentrantlock1 {
private static final Lock lock = new ReentrantLock(true); // Fair lock, first come, first served, unfair lock, random
public void test(a){
lock.lock();
System.out.println("Thread:"+Thread.currentThread().getName()+"Lock obtained");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
System.out.println("Thread:"+Thread.currentThread().getName()+"Release the lock."); }}public static void main(String[] args) {
Reentrantlock1 t = new Reentrantlock1();
Reentrantlock1 t2 = new Reentrantlock1();
new Thread(()->t.test(), "t").start();
new Thread(()->t2.test(), "t2").start(); }}Copy the code
trylock
package com.anzhi.Reentrantlock;
import org.omg.CORBA.TIMEOUT;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Reentrantlock1 {
private static final Lock lock = new ReentrantLock();
public void test(a){
boolean locked = false;
//lock.lock();
try {
locked = lock.tryLock(2, TimeUnit.SECONDS); // Try to get the lock
System.out.println("Thread:"+Thread.currentThread().getName()+"Lock obtained:"+locked);
} catch (InterruptedException e) {
e.printStackTrace();
}
// system.out.println (" Thread: "+ thread.currentThread ().getName()+" lock ");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if(locked) {
lock.unlock();
System.out.println("Thread:"+Thread.currentThread().getName()+"Release the lock."); }}}public static void main(String[] args) {
Reentrantlock1 t = new Reentrantlock1();
Reentrantlock1 t2 = new Reentrantlock1();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
lock.lockInterruptibly();
t.test();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(()->t2.test(), "t2").start(); }}Copy the code
LockInterruptibly breaks the lock and gets execution
LockInterruptibly public void lockInterruptibly() throws InterruptedException 1) If the current thread is not interrupted, the lock is obtained. 2) If the lock is not held by another thread, the lock is acquired and returned immediately, setting the lock hold count to 1. 3) If the current thread already holds the lock, the hold count is increased by one, and the method returns immediately. 4) If the lock is held by another thread, the current thread is disabled for thread scheduling purposes, and the thread will remain dormant until one of two things happens: 1) the lock is acquired by the current thread; Or 2) some other thread interrupts the current thread. 5) If the current thread acquires the lock, set the lock hold count to 1. If the current thread: 1) the interrupt state of the thread has been set when entering this method; Or 2) is interrupted while waiting to acquire the lock. InterruptedException is thrown and the interrupted status of the current thread is cleared. 6) In this implementation, because this method is an explicit breakpoint, the response interrupt is prioritized over the normal or reentrant acquisition of the response lock. The lockInterruptibly in interface Lock throws: InterruptedException if the current thread is interrupted.Copy the code
UsingCountDownLatch
CountDownLatch is a synchronization utility class that allows one or more threads to wait until other threads have finished executing
Usage:
Typical use of CountDownLatch 1: a thread waits for n threads to complete before it starts running. Initialize CountDownLatch’s counter to n, new CountDownLatch(n), and each time a task thread completes, decrement the counter by 1. Countdownlatch.countdown () when the counter becomes 0, The thread awaiting () on CountDownLatch is awakened. A typical application scenario is that when starting a service, the main thread waits for multiple components to load before resuming execution.
package com.anzhi.CountDownLatch;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CountDownLatch1 {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(10);
new Thread(new Runnable() {
@Override
public void run(a) {
try {
Thread.sleep(300);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
latch.countDown();
}
}
}).start();
System.out.println("Wait for child thread to terminate");
try {
latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("Child thread finished running"); }}}Copy the code
Typical use of CountDownLatch 2: Achieve maximum parallelism when multiple threads start executing tasks. Note that parallelism, not concurrency, emphasizes that multiple threads start executing at the same time. Similar to a race, multiple threads are placed at the starting point, wait for the starting gun to go off, and then run at the same time. This is done by initializing a shared CountDownLatch(1) with its counter initialized to 1. Multiple threads first coundownlatch.await() before starting the task. When the main thread calls countDown(), the counter becomes 0 and multiple threads are awakened simultaneously.
package com.anzhi.CountDownLatch;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CountDownLatch2 {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch await = new CountDownLatch(5);
for(int i=0; i<10; i++){
new Thread(()->{
try {
latch.await();
System.out.println(Child thread +Thread.currentThread().getName()+ "Take care of your own business.");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
await.countDown();
}
}).start();
}
System.out.println("Main thread"+Thread.currentThread().getName()+"Deal with tasks");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
latch.countDown();
}
try {
System.out.println("Main thread processing completed");
await.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Child thread processing completed"); }}Copy the code
CyclicBarrier
A fence is similar to a lock, blocking a group of threads until an event occurs. The key difference between a fence and a lock is that all threads must reach the fence at the same time before execution can continue. A lock is used to wait for events, and a fence is used to wait for other threads.
A CyclicBarrier causes a number of threads to converge repeatedly at the fence location. The await method is called when the thread reaches the fence position and blocks until all threads have reached the fence position. If all threads reach the fence, the fence will open, all threads will be released, and the fence will be reset for next use
package com.anzhi.CyclicBarrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrier1 {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(20);
for(int i=0; i<100; i++){
System.out.println("Create worker thread"+i);
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"Start waiting for another thread");
barrier.await();
System.out.println(Thread.currentThread().getName()+"Commence execution");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName()+"Executed");
} catch (InterruptedException e) {
e.printStackTrace();
} catch(BrokenBarrierException e) { e.printStackTrace(); } }).start(); }}}Copy the code
Application Scenarios:
Complex operations: database, network, files
Concurrent execution: thread-action
Phaser
The Java multithreading technology provides the Phaser utility class, which stands for “Phaser,” to solve the problem of controlling multiple threads to work together in phases.
package com.anzhi.Phaser;
import java.util.concurrent.Phaser;
There are two important phases in a Phaser: phase and party. * Phase is a phase with an initial value of 0. When all threads finish executing this round of tasks and start the next round of tasks, * means that the current phase is over and the value of phase automatically increases by 1 when entering the next phase. Party is a thread, and * party=4 means that the Phaser object currently manages four threads. Another important Phaser method that often needs to be overridden is the Boolean onAdvance(int Phase, int registeredParties) method. This method does two things: * 1. This method is automatically called when each phase is complete, so code written by overloading this method is executed at the end of each phase, * equivalent to a CyclicBarrier barrierAction. * 2. When this method returns true, it means that the Phaser has been terminated, so you can smartly set the return value of this method to terminate all threads. * * * /
public class MyPhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) { // The callback method is executed at each stage after completion
switch (phase) {
case 0:
return studentArrived();
case 1:
return finishFirstExercise();
case 2:
return finishSecondExercise();
case 3:
return finishExam();
default:
return true; }}private boolean studentArrived(a){
System.out.println("Students ready, number of students:"+getRegisteredParties());
return false;
}
private boolean finishFirstExercise(a){
System.out.println("All students finish the first question.");
return false;
}
private boolean finishSecondExercise(a){
System.out.println("All students finish question two.");
return false;
}
private boolean finishExam(a){
System.out.println("All students finish question 3, end of the exam.");
return true; }}Copy the code
package com.anzhi.Phaser;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class StudentTask implements Runnable{
private Phaser phaser;
public StudentTask(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run(a) {
System.out.println(Thread.currentThread().getName()+"Arrival test");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"Time for question 1...");
doExercise1();
System.out.println(Thread.currentThread().getName()+"Do question 1 and complete...");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"Time for question 2...");
doExercise2();
System.out.println(Thread.currentThread().getName()+"Do question 2 and complete...");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"Time for question 3...");
doExercise3();
System.out.println(Thread.currentThread().getName()+"Do question 3 and complete...");
phaser.arriveAndAwaitAdvance();
}
private void doExercise1(a) {
long duration = (long)(Math.random()*10);
try {
TimeUnit.SECONDS.sleep(duration);
} catch(InterruptedException e) { e.printStackTrace(); }}private void doExercise2(a) {
long duration = (long)(Math.random()*10);
try {
TimeUnit.SECONDS.sleep(duration);
} catch(InterruptedException e) { e.printStackTrace(); }}private void doExercise3(a) {
long duration = (long)(Math.random()*10);
try {
TimeUnit.SECONDS.sleep(duration);
} catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code
package com.anzhi.Phaser;
/** * 题目 : 5 students take part in the exam, there are three questions in the exam, all students must complete the first question, then continue to do the second question, the following is similar. * * Phaser phase and party two important states, * phase to phase, party said the number of threads in each stage, * only each thread execute the Phaser arriveAndAwaitAdvance (); * will enter the next stage, otherwise block waiting. * such as the five students in the title (thread) calls phaser. ArriveAndAwaitAdvance (); Let's move on to the next problem ***/
public class Main {
public static void main(String[] args) {
MyPhaser phaser = new MyPhaser();
StudentTask[] studentTask = new StudentTask[5];
for (int i = 0; i < studentTask.length; i++) {
studentTask[i] = new StudentTask(phaser);
phaser.register(); // Register once to indicate the number of threads maintained by phaser
}
Thread[] threads = new Thread[studentTask.length];
for (int i = 0; i < studentTask.length; i++) {
threads[i] = new Thread(studentTask[i], "Student "+i);
threads[i].start();
}
// Wait for all threads to finish executing
for (int i = 0; i < studentTask.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Phaser has finished:"+phaser.isTerminated()); }}Copy the code
ReadWriteLock read-write lock
In Java, the Lock interface and objects can be used to easily Lock and release the Lock for the object, but this method does not distinguish between read and write, called ordinary Lock. To improve performance, Java has implemented read-write locks.
package com.anzhi.ReadWriteLock;
import javax.jws.Oneway;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLock {
private final Map<String,Object> cache = new HashMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock(); // Define read locks
private final Lock writeLock = lock.writeLock(); // Define write lock
public Object get(String key){
readLock.lock(); / / Shared lock
try {
Thread.sleep(1000);
return cache.get(key);
}catch (InterruptedException e){
e.printStackTrace();
} finally {
readLock.unlock();
}
return null;
}
public Object put(String key, Object value){
writeLock.lock();
try{
Thread.sleep(1000);
return cache.put(key, value);
}catch(InterruptedException e){
e.printStackTrace();
}finally {
writeLock.unlock();
}
return null;
}
public static void main(String[] args) {
ReadWriteLock readWriteLock = new ReadWriteLock();
for(int i=0; i<10; i++){
new Thread(()->{
readWriteLock.put("T".1);
System.out.println(Thread.currentThread().getName()+"Done");
}).start();
}
for(int i=0; i<10; i++){ // Execute within one second
new Thread(()->{
readWriteLock.get("T");
System.out.println(Thread.currentThread().getName()+"Read complete"); }).start(); }}}Copy the code
Semaphore
Semaphore is also a helper class for thread synchronization that maintains the number of threads currently accessing itself and provides a synchronization mechanism. Use Semaphore to control the number of threads accessing resources at the same time (limiting flow)
package com.anzhi.Semaphore;
import java.util.concurrent.Semaphore;
public class Semaphore1 {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(1); // Allow one thread to execute simultaneously
new Thread(()->{
try {
semaphore.acquire();
System.out.println("T1 running...");
Thread.sleep(200);
System.out.println("T1 running...");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
}).start();
new Thread(()->{
try{
semaphore.acquire();
System.out.println("T2 running...");
Thread.sleep(200);
System.out.println("T2 running...");
}catch (InterruptedException e){
e.printStackTrace();
}finally{ semaphore.release(); } }).start(); }}Copy the code
Exchanger
Sanoer is a tool class used for collaboration between threads. Used for data exchange between threads. It provides a synchronization point at which two threads can exchange data with each other. The two threads exchange data using the Exchange method. If the first thread executes the Exchange method first, it will wait until the second thread executes the Exchange. When both threads reach the synchronization point, the two threads can exchange data and pass the data produced by each thread to the other. Therefore, the use of the exchange() method is important for pairs of threads to use, and when one pair reaches the synchronization point, data is exchanged. Thus the thread objects of the utility class are paired.
package com.anzhi.Exchanger;
import java.util.concurrent.Exchanger;
public class Exchanger1 {
static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
new Thread(()->{
String s = "T1";
try{
s = exchanger.exchange(s);
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+""+s);
}, "t1").start();
new Thread(()->{
String s = "T2";
try{
s = exchanger.exchange(s);
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+""+s);
},"t2").start(); }}Copy the code
LockSupport
LockSupport is a thread blocking utility class, all methods are static methods, can let the thread block at any position, of course, after blocking must have a wake up method.
package com.anzhi.LockSupport;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class TestLockSupport {
public static void main(String[] args) {
Thread t = new Thread(()->{
for(int i=0; i<10; i++){
System.out.println(i);
if(i==5){
LockSupport.park();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch(InterruptedException e) { e.printStackTrace(); }}}); t.start();try {
TimeUnit.SECONDS.sleep(8);
} catch(InterruptedException e) { e.printStackTrace(); } LockSupport.unpark(t); }}Copy the code
Example:
Implement a container, provide two methods, add, size, write two threads, thread 1 adds 10 elements to the container, thread implementation monitor the number of elements, when the number reaches 5, thread 2 gives a hint and end.
package com.anzhi.ThreadMessage;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
public class ThreadTest1 {
volatile List list = Collections.synchronizedList(new LinkedList<>());
public void add(Object o){
list.add(o);
}
public int size(a){
return list.size();
}
public static void main(String[] args) {
ThreadTest1 t = new ThreadTest1();
final Object lock = new Object();
new Thread(()->{
System.out.println("T2 start");
synchronized(lock){
if(t.size() ! =5) {try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("T2 end"); lock.notify(); }},"t2").start();
new Thread(()->{
System.out.println("T1 start");
synchronized (lock) {
for (int i = 0; i < 10; i++) {
t.add(new Object());
System.out.println("add: "+i);
if(t.size() == 5){
lock.notify(); // Wake does not release the lock
try {
lock.wait(); / / wait releases the lock
} catch(InterruptedException e) { e.printStackTrace(); }}/* try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } * /}}},"t1").start(); }}Copy the code
Example:
Write a fixed-capacity synchronous container with put and GET methods and getCount methods that can block calls from 2 producer threads and 10 consumer threads.
package com.anzhi.ThreadMessage;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
/** * Write a fixed-capacity synchronous container with put, GET, and getCount methods that can block calls from 2 producer threads and 10 consumer threads. Use Wait and notify/notifyAll. When the producer thread calls a notifyAll notification, it is possible to notify the producer thread, and when the consumer thread calls a notifyAll notification, it is possible to notify the consumer thread. * * * First point: After count--, this.notifyall () is used to handle the case where: If the container is full, the producer thread is blocked at this time, * this is the consumer thread consumption elements, be careful to awaken the producer thread, without this step, only the consumer thread execution, * consumer threads are performed or perform to the capacity of empty, appears the producer thread has been blocked waiting for a lock, At this point the program will stay here. * * While (lists. Size ()==0) cannot be changed to if(lists. Size ())==0. Consider the case where the container is empty, and the consuming threads T1 and T2 block by calling wait(). The producer thread T3 has produced an element, and the producer thread T3 has awakened all threads. Thread T2 executes directly from the code after this.wait(). * does not recognize that the container is empty, but it still fetches an element, causing an error. So loop * * Third point: NotifyAll () cannot be replaced with this.notify(), because if this is replaced with this.notify(), only one thread is awakened. * Suppose that the container is empty after the consumer thread consumes the last element, and the consumer thread intended to wake up the producer thread. The result is to wake up the consumer thread, which blocks after executing this.wait() in while(lists. Size ()==0). The awakened consumer thread also blocks and the program freezes. * * /
public class ThreadMessage<T> {
final private LinkedList<T> lists = new LinkedList<T>();
final private int MAX = 10; // Up to 10 elements
private int count = 0;
public synchronized void put(T t) {
while (lists.size() == MAX) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lists.add(t);
count++;
this.notifyAll();// Notify the consumer thread to consume
}
public synchronized T get(a) {
T t = null;
while (lists.size() == 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
t = lists.removeFirst();
count--;
this.notifyAll(); // Notify the producer to produce
return t;
}
public static void main(String[] args) {
ThreadMessage<String> c = new ThreadMessage<>();
// Start the consumer thread
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) {
System.out.println("Consumption"+ c.get()); }},"c" + i).start();
}
// Start the producer thread
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 25; j++) {
System.out.println("Production" + Thread.currentThread().getName() + "" + j);
c.put(Thread.currentThread().getName() + ""+ j); }},"p"+ i).start(); }}}Copy the code
The second:
package com.anzhi.ThreadMessage;
/** * Write a fixed-capacity synchronous container with put and GET methods and getCount methods that can support blocking calls from 2 producer threads and 10 consumer threads * * Use Lock and Condition to compare the two methods. Condition specifies more precisely which threads are called * */
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadMessage2<T> {
final private LinkedList<T> list = new LinkedList<T>();
final private int MAX = 10; // Up to 10 elements
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition(); //Condition is essentially a different wait queue
private Condition consumer = lock.newCondition();
public void put(T t) {
try {
lock.lock();
while (list.size() == MAX) {
producer.await();
}
list.add(t);
count++;
consumer.signalAll();// Notify the consumer thread to consume
} catch (InterruptedException e) {
e.printStackTrace();
} finally{ lock.unlock(); }}public T get(a) {
T t = null;
try {
lock.lock();
while (list.size() == 0) {
consumer.await();
}
t = list.removeFirst();
count--;
producer.signalAll(); // Notify the producer to produce
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return t;
}
public static void main(String[] args) {
ThreadMessage2<String> c = new ThreadMessage2<>();
// Start the consumer thread
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) {
System.out.println("Consumption"+ c.get()); }},"c" + i).start();
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Start the producer thread
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 25; j++) {
System.out.println("Production" + Thread.currentThread().getName() + "" + j);
c.put(Thread.currentThread().getName() + ""+ j); }},"p"+ i).start(); }}}Copy the code