We are all in the gutter, but some of us are looking at the stars.

directory

  1. Thread Pooling with ThreadLocal-juc
  2. Lock and CAS-JUC (2)
  3. Concurrent Containers and Concurrent Control – JUC (3)
  4. Aqs-juc (4)

Abstract

  • Why isn’t anyone using itVectorandHashtable?HashMapWhere is it not thread safe?
  • ConcurrentHashMapWhat are the evolution and SAO operations?
  • What is the idea of copy-on-write? What are some examples?
  • Why do we need concurrent queues? What concurrent queues are we ignoring?
  • What do we do when we want to control the coming and going of threads? One by one?

Concurrent container

What’s going on with these concurrent containers

Concurrent* most of the Concurrent data is implemented through CAS. 2. CopyOnWrite* Copies a copy of the original data. 3Copy the code

Why Vector and Hashtable have been left behind in history?

It’s simple, don’t think too much about it, it’s just bad performance.

So why is the performance bad? Where is its performance lost?

Let’s take a look at the get method on Vector:

synchronized
synchronized

So one might ask, why do locking methods have poor performance? Because the lock granularity is an instance object

The same is true for Hashtable.

Why is HashMap thread unsafe?

  1. At the same time, the PUT collision causes data loss

    For example, if two threads are storing data using the same key, their hash values will be the same and placed in the same location, which will inevitably result in a data loss

  2. Data is lost due to simultaneous PUT expansion

    In simultaneous expansion, only one expanded array is retained.

  3. An infinite loop causes CPU100%

    Simultaneous expansion of multiple threads may cause a circular linked list to cause CPU100%. But there is no point in pursuing this question (Sun officials also feel that you should not use HashMaps to solve concurrent scenarios, which is problematic, right?). . If you are really interested, check out this coolshell.cn/articles/96…

What is the difference between ConcurrentHashMap 1.7 and 1.8?

  1. The data structure

    Unsafe: Segment + HashEntry + Unsafe

    Syntax: Synchronized + CAS + Node + Unsafe (removing segments, making locks smaller in granularity)

  2. Hash collision

    Change the original zipper method to a red-black tree if the number of nodes exceeds 8.

    So here’s a little question, why does it turn into a red black tree after 8? Why eight? Why not just use red-black trees from the start?

    1. Red-black trees take up twice as much space per node as linked lists.
    2. The authors calculate the probability of the Poisson distribution, and at 8 the probability is minimal.
  3. Ensure concurrency security

    1.7 Use a Segment lock (extends entreenlock) to secure 16 segments (segments cannot be dynamically added); 1.8 CAS + synchronized is used to ensure the granularity of each Node.

  4. Query complexity

    When there are more than 8 conflicts, the query efficiency can also be guaranteed, and the original O(n) of zipper method becomes O(logn) of red-black tree.

How does ConcurrentHashMap implement combined operations such as a++?

ConcurrentHashMap ensures concurrency safety for concurrent puts, but it does not say that a++ is thread-safe for simultaneous puts:

int a = map.get(key);
int b = a+1;
map.put(key, b);
Copy the code

So let’s think about, how do we solve this problem?

Do you want to wrap this up as synchronized?

So what’s the point of using ConcurrentHashMap? Let’s look at one method: replace

// Using the idea of CAS, loop the judgment and do ++ until it succeeds
// replace guarantees concurrent modification security
while (true) {
    int ori = map.get("key");
    int tar = ori + 1;
    boolean isSuccess = map.replace("key", ori, tar);
    if (isSuccess) {
        break; }}Copy the code

Let’s look at another combined operation to solve the put overwrite problem for the same key:putIfAbsent

// The core idea is as follows
if(! map.contains("key")) {return map.put("key"."value");
}else {
    return map.get("key");
}
Copy the code

What are the applicable scenarios of CopyOnWriteArrayList?

Let’s think about why we have this thing, right?

  1. Used instead ofVectorandSynchronizedList, andConcurrentHashMapInstead ofSynchronizedMapFor the same reason.
  2. VectorandSynchronizedListThe granularity of locks is too large, the concurrency efficiency is relatively low, and cannot be modified during traversal.
  3. The copy-on-write container also includesCopyOnWriteArraySetUsed as an alternative to synchronizationSet

Ok, now let’s talk about scenarios

  1. Read more to write less

    The read operation needs to be as fast as possible, while the write operation does not hurt if it is slow, such as the blacklist scenario.

  2. The requirement of real-time writing is not high

    That said, my changes don’t have to take effect immediately, even if they happen for a while, or if I’m reading the previous data, that’s fine.

What are the read and write rules for CopyOnWriteArrayList?

  • Read synchronization, read/write synchronization, and write synchronization are mutually exclusive

  • Does it look like a rule thief reading and writing locks? But it’s different. There’s an upgrade here, which is that I can read it even if you’re writing it.

  • So let’s look at writing

    Create a new array, insert the new element into it, change the original address, and unlock it.

  • What about when you read it?

    There’s no protection. Just read it.

What is the implementation principle?

  • CopyOnWrite principle: when writing, make a copy first, and change the address after writing
  • The “immutability” principle: Each list is equivalent to an immutable collection on which no modification is performed

What are the disadvantages of that?

  • Data consistency: Only final data consistency is guaranteed, not real-time consistency.
  • Memory footprint: Duplicates take up twice as much memory.

What is a concurrent queue?

These are concurrency safe queues, which can be divided into blocking queues (as shown below) and non-blocking queues. The difference between blocking and non-blocking is whether you can block when the queue is full (or empty) when you put (or take) elements in.

So what are the characteristics of these lines in the image above? Let’s do it one by one

  1. ArrayBlockingQueue: bounded, capacity specificable, underlying array implementation, fair or not set

    We can see the locking (breakable) method of the reentrant lock used at the element insertion step.

  2. PriorityBlockingQueue: A thread-safe version of PriorityQueue that supports priority setting (depending on the object’s natural sort order or the constructor’s Comparator), queue entry and exit order (not fifO), unbounded queue, and PriorityQueue.

  3. LinkedBlockingQueue: Unbounded by default (size integer. MAX_VALUE, capacity can be specified), internal structure of an array of nodes + two locks, generally better performance than ArrayBlockingQueue (smaller lock granularity)

    This is what we’re using hereCondition + ReentrantLockIn fact, this is the same thing assynchronized + Object.wait() + Object.notify()This is a great queue for producer consumers. If you want to see the native implementation, look at the producer consumers I implemented with Wait + Notify in the concurrency principle.

    If the queue is full, it blocks, otherwise an element is put in, if it can continue, it wakes up the thread waiting in notFull, and if the queue was empty before it was put in notEmpty, it wakes up the thread waiting on notEmpty.

  4. SynchronousQueue will: capacity of 0 is not 1, thread pool Executors. NewCacheThreadPool () using the blocking queue

    It has no real capacity, any thread (producer or consumer threads, thread type of the production process such as the put, offer, consumption type of operation poll, for example, take) will be waiting to know to get data or deliver complete data will return, a producer thread’s mission is to attach the threads of the data to a consumer threads, A consumer thread waits for data from a producer thread.

  5. ConcurrentLinkedQueue: The only non-blocking queue that sends packets, internally implemented by linked lists and with CAS at its core.

  6. DelayQueue: DelayQueue (sorted by delay time), elements need to implement Delayed interface (defined ordering rules)

Controlling concurrent processes

Let’s take a look at what you can do to control the concurrency flow

7. What are the typical application scenarios of CountDownLatch?

  1. One thread waits for multiple threads to complete execution

    In media processing, the concatenation command is executed after all tasks to be concatenated are downloaded.

  2. Multiple threads wait for a thread to finish executing

    In the pressure test scenario, all threads wait for a command to pressure the server immediately, instead of creating threads slowly.

  3. More etc.

Here is a small chestnut, simulated athletes running scene, all athletes wait for the gun to start running, until the athletes have reached the finish line after the referee announced the end of the race.

CountDownLatch begin = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(10);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
    final int no = i + 1;
    Runnable runnable = () -> {
        System.out.println("No." + no + "Stand by and wait for the gun.");
        try {
            begin.await();
            System.out.println("No." + no + "Here we go.");
            Thread.sleep((long) (Math.random() * 10000));
            System.out.println("No." + no + "It's the end of the race.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally{ end.countDown(); }}; service.submit(runnable); }// The referee does some preparatory work...
Thread.sleep(5000);
System.out.println("Shots fired, game on!");
begin.countDown();

end.await();
System.out.println("Everyone at the finish line, the race is over!");
Copy the code

This class cannot be reused, that is, cannot be recalculated. If you need to reuse, consider cyclicbarriers and build new instances.

What are the applicable scenarios of Semaphore?

When you need to control the number of concurrent threads.

For example, in the scenario of media processing, the CPU performance of a machine is limited, and we can only allow three transcoding tasks to be carried out at the same time at most. At this time, it is necessary to control the number of transcoding threads.

Let’s use a picture to understand:

static Semaphore semaphore = new Semaphore(3.true);

public static void main(String[] args) {
    ExecutorService service = Executors.newFixedThreadPool(50);
    for (int i = 0; i < 100; i++) {
        service.submit(new Task());
    }
    service.shutdown();
}

static class Task implements Runnable {

    @Override
    public void run(a) {
        try {
            semaphore.acquire();
            // Set the number of requests
            //semaphore.acquire(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "Got the permit.");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "Release the license.");
        semaphore.release();
        // You can also set the number of releases
        //semaphore.release(2);}}Copy the code

Note:

  1. The number of acquired and released must be the same (e.g., if you ask for 3, but you only release 2 at a time, the license will be used less and less, it is not restricted at the class level, so it needs to be coded carefully).
  2. Fairness is generally reasonable to set to true (since semaphores are used to control slow service, if you also set unfair, it can easily lead to starvation).
  3. It does not have to be released by the requesting thread; it can also be released by other threads (thread A can acquire 2, thread B can not acquire, but only release 2).

What is the difference between CyclicBarrier and CountDownLatch?

  1. It works differently: CyclicBarrier waits for a fixed number of threads to reach the fence before it can continue, whereas CountDownLatch only counts to zero. That is, CountDownLatch for events and CyclicBarrier for threads.

    See a CyclicBarrier Demo:

public static void main(String[] args) {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("All hands on deck, all hands on deck!"));
    for (int i = 0; i < 10; i++) {
        new Thread(newTask(i, cyclicBarrier)).start(); }}static class Task implements Runnable{
    private int id;
    private CyclicBarrier cyclicBarrier;

    public Task(int id, CyclicBarrier cyclicBarrier) {
        this.id = id;
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run(a) {
        System.out.println("Thread" + id + "Proceed now to the assembly point.");
        try {
            Thread.sleep((long) (Math.random()*10000));
            System.out.println("Thread"+id+"At the assembly point, start waiting for the others to arrive.");
            cyclicBarrier.await();
            System.out.println("Thread"+id+"Here we go.");
        } catch(InterruptedException | BrokenBarrierException e) { e.printStackTrace(); }}}Copy the code
  1. Different reusability:new CyclicBarrier(5)Execute after five threads arriverun, you can continue to wait for the next five threads to arrive.

conclusion

Whoo, now that we’re done with fancy concurrent containers, we know that CopyOnWrite can be used to solve the problem of reading too much and writing too little; Implementing the producer-consumer model with blocking queue; Use CountDownLatch to tame fast threads.

In the next chapter we do something big, the president of concurrency: AQS. Try implementing a concurrency tool of our own using AQS.