Concurrent container

Concurrent containers can be divided into three basic categories: Concurrent*, CopyOnWrite, and Blocking.

(1) Concurrent* : CAS is generally used to ensure thread safety and efficiency, without heavy modification overhead of other Concurrent containers. However, it does not have the strong consistency of traversal, that is, iterator traversal can continue if the container changes. So we provide quick failures for Concurrent*. If the iterator changes detected in the process of traversing the container, will throw ConcurrentModificationException.

②CopyOnWriteList: When adding elements, use ReentrantLock to lock. Then copy the array and add elements to the newly copied array. Once you’re done, point your array to the new array. Then the read operation does not need to lock, and the write operation is directly referential. That is, the read operation reads the old array in its entirety instead of the semi-finished array, without causing dirty reads. So CopyOnWriteArrayList’s main purpose is to ensure concurrent traversal without locking! ArrayList has fast failures.

③Blocking: Use ReentrantLock (Lock+Condition) to implement Blocking. Firstly, the lock is acquired to guarantee the exclusivity of the queue, and then the communication between consumers and producers is realized through the wait and notification of Condition.

There are Collections. SynchronizedXXX (new XXX)

Queue

ConcurrentLinkedQueue source

It has head nodes and tail nodes to form the beginning and end, and nodes are associated with each other by a next reference.

Its enqueue operation essentially means that the current thread gets the tail of the Queue and sets next of the tail as the enqueue node. The Settings are [casNext sets enqueued node to tail] and [casTail update tail].

  • ① When another thread modifies the tail node in the meantime, the current thread fails and reacquires the tail node.
  • (2) When casTail updates the tail node, the tail node is updated only when the distance between the tail node and the current tail node is greater than = HOPS. The default value of HOPS is 1. This design reduces the number of tail update operations and improves the enqueue efficiency.

When exiting the queue, the queue head is popped up and casHead is updated, also designed by HOPS.

BlockingQueue

LinkedBlockingQueue: a bounded queue implemented using a linked list that sorts elements on a first-in, first-out basis.

LinkedBlockingQueue maintains a buffer linked list, and then limits the maximum capacity by capacity. LinkedBlockingQueue maintains a ReentrantLock and Condition for writes and reads, so producers and consumers can manipulate data in the queue in parallel.

It is essentially a team operations, first putLock. LockInterruptibly (lock), and then if there are remaining capacity of the team, if the capacity is full Condition to synchronize the queue blockage. Finally, release the lock.

Put () : If the queue reaches its maximum capacity, it enters the Condition synchronization queue.

Offer () : Returns false if the queue reaches its maximum capacity.

Offer (E E,long timeout,TimeUnit Unit) : use Condition. AwaitNanos (time) for an infinite poll + timeout release.

Lock + Condition is also used when joining a team.

Poll () : Null is returned if the queue is empty.

Take () : If the queue is empty, the Condition synchronization queue is blocked.

ArrayBlockingQueue: A bounded queue implemented using arrays that sorts elements on a first-in, first-out basis. The second constructor argument, ture, implements thread-fairness through a reentrant lock.

ArrayBlockingQueue maintains a buffer array internally, so it must be initialized with an initial capacity. Unlike LinkedBlockingQueue, it only uses a ReentrantLock to ensure read and write concurrency, so it does not perform as well as LinkedBlockingQueue. But LinkedBlockingQueue may happen OOM because it is unbounded.

PriorityBlockingQueue: An unbounded queue that supports priorities and sorts elements using the Comparator

DelayQueue: an unbounded queue that supports delayed fetching. The creation of an element specifies how long it takes to queue out, and uses PriorityQueue to sort by queue time. Can be used for scheduled task Queue.

SynchronousQueue: a blocking queue that does not store elements. Each PUT operation must wait for a take operation or elements cannot be added. It can achieve thread fairness, that is, waiting threads will also access the queue on a first-in-first-out basis. So it can replace CountdownLatch(1).

LinkedTransferQueue: an unbounded queue implemented using a linked list, with additional transfer() : determine whether there are currently consumers waiting to receive elements, if there are producers will directly transfer elements to consumers; If not, join the queue and spin to wait for the consumer (spin for a certain amount of time thread.yield () suspends the current Thread).

LinkedBlockingDeque: a two-ended queue implemented using a linked list, with additional methods to get and delete elements at the end of the queue. Can be used in job steal mode.

Deadlock prevention

Deadlock: A group of threads competing for resources that permanently block while waiting for occupied resources.

Prevent deadlocks from breaking the conditions necessary to cause deadlocks:

  • (1) Mutually exclusive: Shared resources can only be occupied by one thread, the purpose of locking is mutually exclusive, this cannot be broken.
  • ② Hold and wait: When the current thread is waiting for other resources, the occupied shared resources will not be released.

Solution: Use another role to apply for all resources required by the current role. If the application fails, the application fails. If the application is successful, the other role releases resources after the current role performs the operation.

  • ③ Non-preemption: Resources occupied by the current thread cannot be forcibly preempted by other threads.

Solution: [RE: use Lock solution, dream linkage]

  • (4) Circular waiting: threads wait for each other to hold resources.

Solution: Sort multiple shared resources and apply for resources in sequence

Executor framework

Java separates the thread’s unit of work from its execution mechanism. Runnable and Callable are units of work, and executors provide the execution mechanism. Java multithreaded programs break an application into tasks that executors then hand off to threads that the operating system then hands off to the CPU.

FutureTask:

FutureTask inherits the Runnable interface, so it can be executed with executorService.submit (). Return a FutureTask asynchronous result, call futureTask.get () to block the result, call futureTask.canal () to abort the unstarted task, and have no effect if FutureTask has been started.

CompletableFuture:

It acts as a thread pool. Implements the Future and CompletionStage interfaces, cannot be executed as tasks, supports asynchronous callback functional interfaces (non-blocking)

The bottom line is to create a ForkJoinPool thread pool, pass in the Supplier object, encapsulate AsyncSupply, and place it in the thread pool. Threads in the thread pool then block waiting for the Supplier object’s GET (), which is equivalent to waiting for a FutureTask callback. Used in the main thread CompletableFuture. RunAsync (Function, myThreadPool), will be called thenApply Function f () method will be introduced to encapsulate functional interface as an object and press into the stack. When an asynchronous thread executes, the functional interface is bounced and executed.

In the frame of the Executor, the ExecutorService used to perform tasks, ThreadPoolExecutor, ScheduledThreadPoolExecutor ExecutorService concrete implementation class. ExecutorService can perform Runnable, Callable, and FutureTask tasks. Runnable can be packaged as Callable along with the results to be returned. When Callable and FutureTask are executed, the result of the FutureTask asynchronous calculation is returned.

There are four ways to create threads

Inherits the Thread class to create a Thread

Thread t = Thread(Runnable)

Thread t = Thread(callable)

Thread pools create threads.

ThreadPoolExecutor: Implementation class for thread pools

FixedThreadPool: The number of core threads and the maximum thread pool is set based on the incoming parameter, using LinkedBlockingQueue as the work queue.

SingleThreadExecutor: Executor that uses a single worker thread, with a core thread count and maximum thread pool of 1, using LinkedBlockingQueue as the work queue.

CachedThreadPool: the number of core threads is 0, the maximum number of workers is unbounded, the keepAliveTime is 60 seconds, and the SynchronousQueue is used as the work queue.

ScheduledThreadPoolExecutor extends ThreadPoolExecutor: at a specified delay time to perform a task or ScheduledFutureTask tasks on a regular basis, return ScheduledFuture.

ScheduledThreadPoolExecutor use DelayQeue as work queue, DelayQueue encapsulates a PriorityQueue, will rank ScheduledFuture of them according to the time and the serial number. Task execution pops up when the task expires, then updates the time and puts it at the end of the queue. Because my scheduling thread pool only for the heart, so I set the heartbeat task to singleton, then use the ScheduledThreadPoolExecutor. The schedule (Runnable, time, timeUnit) perform a heartbeat scheduling.

ForkJoinPool

WorkStealingPool: A pool of work-stealing threads, called forkJoinPools, that splits large tasks into smaller ones, which the CPU executes in parallel, and then merges the results. The bottom layer overrides the compute() method to recursively divide large tasks into smaller ones and call the fork() method to execute them separately.

The thread pool:

New ThreadPoolExecutor seven parameters:

CorePoolSize: specifies the number of core threads in the thread pool.

②maximumPoolSize: the maximum number of threads that can be held in a thread pool.

③ keepAliveTime: the maximum lifetime of an overworked thread in the thread pool after it is idle.

④ Unit: The unit of the maximum survival time of overtime threads.

⑤ workQueue: a blocking queue storing tasks waiting to be executed.

⑥threadFactory: the factory used to create threads.

⑦ Handler: Rejects the policy. When the maximum number of threads in use is reached and the work queue is full, the rejection policy is triggered when another task is submitted. Thread pools provide the following four rejection policies.

  • CallerRunsPolicy: The thread submitting the task executes the task itself
  • AbortPolicy: default refusal strategies, throw RejectedExecutionException anomalies
  • DiscardPolicy: Discards tasks directly
  • DiscardOldestPolicy: Discards the earliest task that enters the work queue

Working principle:

Thread pools are different from pooled resources in the general sense. In general, we call acquire() to apply for resources when we need them, and call Release () to release resources when we run out of resources. Thread pools, on the other hand, adopt a producer-consumer model. One party using the thread pool is the producer, and the thread pool itself is the consumer. In the internal maintenance class of the thread pool, a work queue and worker threads are maintained. The user calls execute() to submit Runnable tasks. Execute () simply adds tasks to the work queue. When a task arrives, the thread pool uses the core thread pool to execute the task, and if the core thread pool is all working, the task is put on a work queue for execution. If the work queue is also full, the thread pool creates overtime threads. If the maximum number of core and overtime threads is reached and the task queue is still full, the rejection policy is triggered.

Thread pool size selection strategy:

For slow, I/ O-heavy tasks, most of the operating system’s time is spent interacting with I/O, and threads do not consume CPU while performing I/O operations, so we need more threads to increase CPU utilization without requiring a large task queue (CPU* 2).

For CPU computing tasks with high throughput, the essence is to improve CPU utilization and reduce thread I/O consumption. Therefore, the number of threads should not be too high (to reduce the overhead of thread context switching), but long task queues should be used for caching (CPU+ 1,1 is to prevent occasional page miss interrupts or other pauses).

Points to note when using thread pools

* All thread pools provided by Executors use unbounded work queues, which easily lead to OOM

The default reject policy is not captured by the compiler, so customize the reject policy (with downgrade)

Concurrent utility class

CountDownLatch: CountDownLatch maintains a counter underneath.

The constructor for CountDownLatch accepts an int to wait for several threads. When countdownlatch.await () is called, the current thread blocks and waits for another thread. Another thread calls countDown() to decrement the counter by one. When reduced to 0, the thread blocking at await() is awakened.

【 CyclicBarrier 】 :

Causes a group of threads to block when they reach a barrier, and the barrier does not open until the last thread reaches the barrier, releasing all threads. Each thread calling the await() method will block at the barrier and tell the barrier I have reached it, and the barrier counter decreases by one. When reduced to 0, all release. (For multithreading data calculation. I can’t use my project)

[Fork/Join] :

A ForkJoinPool is used to execute ForkJoin tasks by dividing large tasks into smaller ones and placing them in a double-ended queue. Several threads are then started to retrieve the results from the queue, and a separate thread is started to merge the data. (My project Queue should be a Deque.)

【 Principle of Atomic Operation -CAS】

Multiprocessor implementation atomic operation:

① The use of bus lock to ensure atomicity: multiple processors read variables from their cache at the same time, and write them into the system memory respectively after operation, resulting in inconsistent linear data. Therefore, using a bus lock to lock the communication between CPU and memory, the processor will issue a LOCK# signal when operating on the shared variable, and the requests of other processors will be blocked, realizing the processor’s exclusive sharing of resources.

(2) Use cache lock to ensure atomicity: when the processor writes the lock operation back to memory, it will directly modify the internal memory address and ensure the atomicity of the operation through the cache consistency mechanism.

Java guarantees atomic operation: Use CAS (compareAndSet(int expect,int update)) to loop the CAS operation until it returns a success.

Principle: Take a snapshot of the real value and put it into the volatile variable in the thread memory before the update. Check whether the current value is the same as the real value before the update. If the value is the same, no other thread changes the value during the update. Volatile read and volatile write ensure that the commands before and after the CAS operation are not reordered, and data in the local memory is flushed to the shared memory immediately after the write operation, ensuring the validity of the CAS.

CAS operation problems:

①ABA problem: WHEN CAS checked whether the value changed, ABA change occurred in the value, but could not be detected. You can use the version number. JDK AtomicStampedReference.com pareAndSet () method will check whether the current reference and the current logo change, if all quite will operate.

② Long cycle time and high CPU overhead

③ Atomicity operation of multiple variables cannot be guaranteed. Using atomicReferences to guarantee atomic operations on reference objects, you can encapsulate shared variables in reference objects.

Atomic classes: The underlying CAS method uses unSafe to determine and update.