Small knowledge, big challenge! This paper is participating in theEssentials for programmers”Creative activities

Say first summarized

Both CountDownLatch and CyclicBarrier implement waits between threads, but they have different priorities:

  • CountDownLatch is typically used when thread A waits for several other threads to complete their tasks before it executes.
  • A CyclicBarrier is typically used when a group of threads wait for each other to reach a state and then execute simultaneously.
  • Countdownlatches are not reusable, while cyclicBarriers are.

Semaphore is similar to a lock in that it controls access to a group of resources, whereas a lock controls access to a resource.

A, CountDownLatch

  • The CountDownLatch class, located under the java.util.Concurrent package, enables count-like functionality. For example, if task A is waiting for the other four tasks to complete, CountDownLatch can be used to implement this feature.
  • The CountDownLatch class provides only one constructor:
public CountDownLatch(int count) {};// The argument count is the count value
Copy the code
  • The following three methods are the most important in the CountDownLatch class:
// The thread calling the await() method is suspended and waits until count is 0 before continuing
public void await(a) throws InterruptedException {};// Similar to await(), except to wait a certain amount of time before count has changed to 0
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {};// Subtract count by 1
public void countDown(a) {};Copy the code
  • Code implementation
class Task implements Runnable{
    private static int count = 0;
    private final int id = count++;
    final CountDownLatch latch ;
    public Task(CountDownLatch latch){
        this.latch = latch;
    }

    @Override
    public void run(a){
        try {
            print(this+"In process");
            TimeUnit.MILLISECONDS.sleep(3000);
            print(this+"Executed");
            latch.countDown();
        } catch (InterruptedException e) {
            print(this + "Interrupted"); }}@Override
    public String toString(a) {
        return "Task-"+id; }}public class Test {
    public static void main(String[] args) {
        final CountDownLatch latch = new CountDownLatch(2);
        ExecutorService exec = Executors.newCachedThreadPool();

        exec.execute(new Task(latch));
        exec.execute(new Task(latch));

        try {
            print("Wait for 2 child threads to complete...");
            long start = System.currentTimeMillis();
            latch.await();
            long end = System.currentTimeMillis();

            print("2 child threads have completed execution"+(end - start));
            print("Continue executing the main thread");
        }catch (InterruptedException e){
            print("The main thread is interrupted."); } exec.shutdown(); }} # output: wait2The child thread completes... Task-0Executing Task-1Executing Task-0Task- Is completed1completed2The child thread has finished executing3049Continue executing the main threadCopy the code

Second, the CyclicBarrier

  • A literal loopback barrier that allows a group of threads to wait until a certain state before all execute simultaneously. Cyclicbarriers are called loops because they can be reused once all waiting threads have been released. We’ll call this state a barrier for now, and the thread is in a barrier after calling the await() method.
  • The CyclicBarrier class is located under the java.util.concurrent package and CyclicBarrier provides two constructors:
  • The parties parameter specifies how many threads or tasks to wait until the barrier state
  • The barrierAction argument is what will be executed when these threads all reach the Barrier state
public CyclicBarrier(int parties, Runnable barrierAction) {}
public CyclicBarrier(int parties) {}
Copy the code
  • Then the most important method in CyclicBarrier is the await method, which has two overloaded versions:
  • The first version is commonly used to suspend the current thread until all threads reach the barrier state and then execute subsequent tasks simultaneously.
  • The second version makes these threads wait for a certain amount of time, and if there are any threads that haven’t reached the Barrier state, the threads that have reached the barrier do the subsequent tasks.
public int await(a) throws InterruptedException, BrokenBarrierException {};public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {};Copy the code
  • The code shown
class WriteTask implements Runnable{
    private static int count = 0;
    private final int id = count++;
    private CyclicBarrier barrier ;
    private static Random random = new Random(47);
    public WriteTask(CyclicBarrier cyclicBarrier) {
        this.barrier = cyclicBarrier;
    }

    @Override
    public void run(a) {
        print(this+"Start writing data...");
        try {
            // Use sleep to simulate writing data
            TimeUnit.MILLISECONDS.sleep(random.nextInt(5000));      
            print(this+"Write data complete, wait for other threads to write data complete."+""+System.currentTimeMillis());
            barrier.await();
        } catch (InterruptedException e) {
            print(this + "is interrupted!");
        }catch(BrokenBarrierException e){
           throw new RuntimeException(e);
        }
        print("All tasks are written, proceed to other tasks... "+System.currentTimeMillis());
    }

    @Override
    public String toString(a) {
        return getClass().getSimpleName()+"-"+id; }}public class CyclicBarrierTest {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < N; ++i){
            exec.execute(newWriteTask(barrier)); } exec.shutdown(); }} # output: WriteTask-3Start writing data... WriteTask-2Start writing data... WriteTask-1Start writing data... WriteTask-0Start writing data... WriteTask-2After data is written, wait for other threads to complete writing1512048648904
WriteTask-1After data is written, wait for other threads to complete writing1512048650042
WriteTask-0After data is written, wait for other threads to complete writing1512048650209
WriteTask-3After data is written, wait for other threads to complete writing1512048652606After all tasks are written, continue processing other tasks...1512048652607After all tasks are written, continue processing other tasks...1512048652607After all tasks are written, continue processing other tasks...1512048652607After all tasks are written, continue processing other tasks...1512048652607
Copy the code
  • If you want to perform additional operations on a CyclicBarrier after all thread writes are complete, you can provide the Runnable parameter to CyclicBarrier:
class WriteTask implements Runnable{
    private static int count = 0;
    private final int id = count++;
    private CyclicBarrier barrier ;
    private static Random random = new Random(47);
    public WriteTask(CyclicBarrier cyclicBarrier) {
        this.barrier = cyclicBarrier;
    }

    @Override
    public void run(a) {
        print(this+"Start writing data...");
        try {
            // Use sleep to simulate writing data
            TimeUnit.MILLISECONDS.sleep(random.nextInt(5000));     
            print(this+"Write data complete, wait for other threads to write data complete."+""+System.currentTimeMillis());
            barrier.await();
            TimeUnit.MILLISECONDS.sleep(10);
        } catch (InterruptedException e) {
            print(this + "is interrupted!");
        }catch(BrokenBarrierException e){
           throw new RuntimeException(e);
        }
        print("All tasks are written, proceed to other tasks... "+System.currentTimeMillis()+Thread.currentThread());
    }

    @Override
    public String toString(a) {
        return getClass().getSimpleName()+"-"+id; }}public class CyclicBarrierTest {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N, new Runnable() {
            @Override
            public void run(a) { print(Thread.currentThread()); }}); ExecutorService exec = Executors.newCachedThreadPool();for(int i = 0; i < N; ++i){
            exec.execute(newWriteTask(barrier)); } exec.shutdown(); }} # output: WriteTask-3Start writing data... WriteTask-1Start writing data... WriteTask-2Start writing data... WriteTask-0Start writing data... WriteTask-1After data is written, wait for other threads to complete writing1512049061954
WriteTask-2After data is written, wait for other threads to complete writing1512049063092
WriteTask-0After data is written, wait for other threads to complete writing1512049063261
WriteTask-3After data is written, wait for other threads to complete writing1512049065657
Thread[pool-1-thread-4.5,main] All tasks are written, continue to process other tasks... 1512049065668Thread[pool-1-thread-2.5,main] All tasks are written, continue to process other tasks... 1512049065668Thread[pool-1-thread-1.5,main] All tasks are written, continue to process other tasks... 1512049065668Thread[pool-1-thread-4.5,main] All tasks are written, continue to process other tasks... 1512049065668Thread[pool-1-thread-3.5,main]
Copy the code

As you can see from the result, when all four threads reach the barrier state, one of the four threads is selected to execute the Runnable.

  • Cyclicbarriers can also be reused, as shown in the following example:
class WriteTask implements Runnable{
    private static int count = 0;
    private final int id = count++;
    private CyclicBarrier barrier ;
    private static Random random = new Random(47);
    public WriteTask(CyclicBarrier cyclicBarrier) {
        this.barrier = cyclicBarrier;
    }

    @Override
    public void run(a) {

        while(! Thread.interrupted()){ print(this+"Start writing data...");
            try {
                TimeUnit.MILLISECONDS.sleep(random.nextInt(5000));      
                print(this+"Write data complete, wait for other threads to write data complete."+""+System.currentTimeMillis());
                barrier.await();
                TimeUnit.MILLISECONDS.sleep(10);
            } catch (InterruptedException e) {
                print(this + "is interrupted!");
            }catch(BrokenBarrierException e){
                throw new RuntimeException(e);
            }
            print("All tasks are written, proceed to other tasks... "+System.currentTimeMillis()); }}@Override
    public String toString(a) {
        return getClass().getSimpleName()+"-"+id; }}class CyclicBarrierManager implements Runnable{
    private CyclicBarrier barrier ;
    private ExecutorService exec;
    public CyclicBarrierManager(CyclicBarrier barrier, ExecutorService exec,int N){
        this.barrier = barrier ;
        this.exec = exec;
        for (int i = 0; i < N-1; ++i){
            exec.execute(newWriteTask(barrier)); }}@Override
    public void run(a){
        while(! Thread.interrupted()){try {
                barrier.await();
            }catch (InterruptedException e){
                print(getClass().getSimpleName()+"Interrupted!");
            }catch (BrokenBarrierException e){
                throw newRuntimeException(e); }}}}public class CyclicBarrierTest {
    public static void main(String[] args) throws Exception{
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(newCyclicBarrierManager(barrier,exec,N)); exec.shutdown(); }} # output: WriteTask-1Start writing data... WriteTask-2Start writing data... WriteTask-0Start writing data... WriteTask-2After data is written, wait for other threads to complete writing1512051484365
WriteTask-0After data is written, wait for other threads to complete writing1512051485503
WriteTask-1After data is written, wait for other threads to complete writing1512051488068After all tasks are written, continue processing other tasks...1512051488078After all tasks are written, continue processing other tasks...1512051488078
WriteTask-2Start writing data... After all tasks are written, continue processing other tasks...1512051488078
WriteTask-1Start writing data... WriteTask-0Start writing data... WriteTask-0After data is written, wait for other threads to complete writing1512051488513
WriteTask-1After data is written, wait for other threads to complete writing1512051489045
WriteTask-2After data is written, wait for other threads to complete writing1512051489945After all tasks are written, continue processing other tasks...1512051489955
WriteTask-0Start writing data... After all tasks are written, continue processing other tasks...1512051489955After all tasks are written, continue processing other tasks...1512051489955
WriteTask-2Start writing data... WriteTask-1Start writing data... WriteTask-2After data is written, wait for other threads to complete writing1512051490155
WriteTask-1After data is written, wait for other threads to complete writing1512051494477
WriteTask-0After data is written, wait for other threads to complete writing1512051494823After all tasks are written, continue processing other tasks...1512051494833After all tasks are written, continue processing other tasks...1512051494833
WriteTask-0Start writing data... After all tasks are written, continue processing other tasks...1512051494833
WriteTask-1Start writing data... WriteTask-2Start writing data... WriteTask-2After data is written, wait for other threads to complete writing1512051494961
WriteTask-0After data is written, wait for other threads to complete writing1512051496040
WriteTask-1After data is written, wait for other threads to complete writing1512051498121After all tasks are written, continue processing other tasks...1512051498132After all tasks are written, continue processing other tasks...1512051498132
WriteTask-1Start writing data... After all tasks are written, continue processing other tasks...1512051498132
Copy the code

Third, Semaphore

  • Semaphore translates literally to Semaphore. Semaphore allows multiple threads to access a shared resource simultaneously, acquire a license through acquire(), wait if not, and release() releases a license.
  • The Semaphore class, located under the java.util.concurrent package, provides two constructors:
// Permitting indicates the number of permits, that is, how many threads are allowed to access at the same time
public Semaphore(int permits) {          
    sync = new NonfairSync(permits);
}
// If it is fair, the longer you wait, the sooner you get the license
public Semaphore(int permits, boolean fair) {    
    sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}
Copy the code
  • Acquire (); release(); Semaphore ();
public void acquire(a) throws InterruptedException {}// Get a license
public void acquire(int permits) throws InterruptedException {}// Obtain permits
public void release(a) {}          // Release a license
public void release(int permits) {}    // Release permitting
Copy the code
  • Acquire () is used to obtain a license, and if none can be obtained, it waits until the license is obtained.
  • Release () is used to release permissions.
  • Note that permission must be obtained before release.

All four methods block. If you want immediate results, you can use the following methods:

// Try to obtain a license, return true immediately on success, false immediately on failure
public boolean tryAcquire(a) {};// Try to obtain a license, return true immediately if successful within the specified time, false immediately otherwise
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {};// Attempts to obtain permits, true immediately if successful, false immediately if failed
public boolean tryAcquire(int permits) {};// Attempt to obtain permits immediately true if successful within the specified time, otherwise immediately false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {};package sychronized;
import java.util.Random;
import java.util.concurrent.*;
import static net.mindview.util.Print.*;

class Worker implements Runnable{
    private static int count = 0;
    private final int id = count++;
    private int finished = 0;
    private Random random = new Random(47);
    private Semaphore semaphore;
    public Worker(Semaphore semaphore){
        this.semaphore = semaphore;
    }

    @Override 
    public void run(a){
        try {
            while(! Thread.interrupted()){ semaphore.acquire(); print(this+"Take up a machine in production... ");
                TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));
                synchronized (this){
                    print("It's already in production."+(++finished)+"A product,"+"Unleash the machine."); } semaphore.release(); }}catch(InterruptedException e) { e.printStackTrace(); }}@Override
    public String toString(a) {
        return getClass().getSimpleName()+"-"+id; }}public class SemaphoreTest {
    public static void main(String[] args) {
        int N = 8;            / / the number of workers
        Semaphore semaphore = new Semaphore(5); // Number of machines
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < N; ++i){
            exec.execute(newWorker(semaphore)); } exec.shutdown(); }}Copy the code