Source: author: jin into classmate juejin. Im/post / 5 d3c46d2f265da1b9163dbce

What? I hear you right, I see you right… Multi-threaded concurrent execution of tasks, the results of collection ~~ no longer sad… .

The introduction

Let’s take a look at some APP acquisition data, such as, a page to obtain more than N, up to 10 or so a user behavior data, such as: the number of likes, the number of published articles, the number of likes, the number of messages, the number of concerns, the number of favorites, the number of fans, the number of cards, red envelopes… . It is really much ~ let’s see some pictures:

At ordinary times to 10 + interface to get the data (because when you write 10 + a query, and a half minutes to response the estimates), more than one page N interface, is really dead tired front-end baby, the front open multithreaded also tired, after we do the mass to the front of the babies, after all, have words called “programmers get embarrassed programmers!”

Today, we can also use an interface to return these data to the fast, solve the serial programming, blocking programming caused by the distress ~

Multithreaded concurrent execution of tasks, the result collection

Today’s pigs are: Future, FutureTask, ExecutorService…

  • Using FutureTask to get results is good for both young and old, except that it consumes CPU. FutureTask can also do latching (which implements the semantics of a Future and represents an abstract computable result). By treating Callable(equivalent to a Runnable that produces results) as a property and inheriting Runnable itself as an executor,FutureTask is essentially an asynchronous task executor that supports cancellation behavior.
  • Callable is a callback interface that can generically declare return types, and Runnable is a method that the thread executes. This is very simple ~ we want to go into the source code to look good ~ because it is really very simple ~
  • FutureTask implements the Future, provides start, cancel, Query, etc., and implements the Runnable interface, which can be submitted to threads for execution.
  • Java concurrency tool class three axe state, queue, CAS

state

/** * The run state of this task, initially NEW. The run state * transitions to a terminal state only in methods set, * setException, and cancel. During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: (实 习) (实 习) (实 习) (实 习) (实 习) (实 习) (实 习) (实 习) (实 习) (实 习) (实 习) (实 习) (实 习) (实 习) (实 习) (实 习) CANCELLED // create --> cancel * NEW --> interrupt --> INTERRUPTED // create --> interrupt --> interrupt end */ private volatile int state; Private static final int NEW = 0; Private static final int COMPLETING = 1; Private static final int NORMAL = 2; Private static final int exception = 2; Private static final int CANCELLED = 4; Private static final int Interrupt = 5; Private static final int INTERRUPTED = 6; // The task is interrupted /** The underlying callable; nulled out after running */ private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters;Copy the code

Look at the picture before you understand:

Public Interface Future<T> {/** * Cancel tasks *@param mayInterruptIfRunning * Whether it is possible to cancel tasks that are in progress but have not completed. True if the task is in progress * If the task has not been executed, true if mayInterruptIfRunning is true or false * If the task has completed, Returns false */ Boolean Cancel (Boolean mayInterruptIfRunning) regardless of whether mayInterruptIfRunning is true or false; /** * Return true */ Boolean isCancelled() if the task was cancelled before it completed normally; /** * Whether the task is complete */ Boolean isDone(); /** * Obtain execution results by blocking */ T get() throws InterruptedException, ExecutionException; /** * get the execution result by blocking. If it is not returned within the specified time, Null */ T get(Long timeout, TimeUnit Unit) throws InterruptedException, ExecutionException, TimeoutException; }Copy the code

Future

  • Cancle Cancle can stop execution of the task without success depending on the return value true or false
  • Get blocks the result of a callable task, that is, get blocks the calling thread until the result is returned
  • IsCancelled Whether the cancellation succeeded
  • IsDone is complete

Key notes:

Furture. Get () gets the value of the result of the execution, depending on the state of the execution, immediately returning the result if the task is complete, otherwise blocking until the task is complete, then returning the result or throwing an exception.

Run Completed represents all possible end states of the computation, including normal end, end due to cancellation, and end due to an exception. When it enters the finished state, it stops in this state. As long as state is not in the NEW state, it indicates that the task has been completed.

FutureTask is responsible for passing the results of a calculation from the thread executing the task to the thread calling it, and for ensuring safe publication of the results during delivery

UNSAFE, its lock-free programming technology ensures thread safety. To preserve CPU consumption for lock-free programming, state tokens are used to reduce CPU stress during idling

  • Task origin: Callable
  • The runner of the task
  • Outcome of the task: Outcome
  • Get the results of the task: State + Outcome + Waiters
  • The work is interrupted or canceled: State + Runner + Waiters

Run method

1, check state, not NEW, it is started, directly return; Otherwise, runner is set to the current thread, success continues, otherwise returns.

2. Call callable.call () to execute the task, call the set(result) method on success, call the setException(ex) method on failure, and set state and call finishCompletion(). Wake up threads blocking the get() method.

3. As the comment shows, if you omit the ran variable and set “set(result);” The statement moves to the try block “ran = true;” At statement, what happens? At first, there is no problem from the code logic, but consider “set(result);” What if the method throws an exception or even an error? The set() method will eventually call the user-defined done() method, so it cannot be omitted.

4. If the state is INTERRUPTING, voluntarily surrender the CPU and spin to wait for another thread to complete the interrupt process. See handlePossibleCancellationInterrupt (int s) method.

Public void the run () {/ / UNSAFE.com pareAndSwapObject, CAS guarantees Callable task is performed only once Unlocked programming the if (the state! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; If (c! = null && state == NEW) {// If the task is not empty and the executor state is the initial value, the task will be executed; V result; boolean ran; Try {result = c.call(); // Execute task ran = true; } catch (Throwable ex) {result = null; // exception, empty result ran = false; / / fail setException (ex); } if (ran) // question: ran variable can be omitted, set(result); Inside the try block, okay? set(result); }} finally {runner = null; // The runner is non-empty until the set state, to prevent concurrent calls to the run() method. int s = state; If (s > = INTERRUPTING) / / another thread interrupts the current thread, the CPU is let out, spin wait handlePossibleCancellationInterrupt (s); }}Copy the code
Private void handlePossibleCancellationInterrupt (int s) {if (s = = INTERRUPTING) / / when the state is INTERRUPTING while (state = = INTERRUPTING) // Indicates that a Thread is INTERRUPTING the current Thread thread.yield (); // Give up CPU, spin wait interrupt}Copy the code

To recap: The run method focuses on the following things:

  • Sets the Runner property to the thread currently executing the run method
  • Call the Call method of the Callable member variable to perform the task
  • Set the outcome of execution. If the execution is successful, the outcome saves the execution result. If an exception occurs during the execution, the exception is preserved in the outcome. Before setting the result, state should be set as intermediate state
  • After assigning to outcome, set state to NORMAL or EXCEPTIONAL.
  • Wake up all waiting threads in the Treiber stack
  • Waiters, Callable, Runner set to NULL
  • Check for missed interrupts and, if so, wait for the interrupt state to complete.

How can we do without the get method, which blocks all the time? See Synonyms at awaitDone

public V get() throws InterruptedException, ExecutionException { int s = state; // COMPLETING the tasks well, s = awaitDone(false, 0L); // Wait for return report(s); // Report results}Copy the code

Get (long, TimeUnit) ¶ Get (long, TimeUnit) ¶ get(long, TimeUnit) ¶ get(long, TimeUnit) ¶ .

public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {if (unit == null) // Throw new NullPointerException(); int s = state; // those (s <= COMPLETING && (s = awaitDone(true, unit.tonanos (timeout))) <= COMPLETING) The task is being executed and you need to wait. COMPLETING throw New TimeoutException(); Return report(s); // Report results}Copy the code

Then look at awaitDone, can write to know death cycle while (true) | for (;;) Are the master ~

private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; // Calculate deadline WaitNode q = null; Queued = false; For (;;) {if (thread.interrupted ()) {if (thread.interrupted ()) {if (thread.interrupted ()) {if (thread.interrupted ()) {removeWaiter(q); throw new InterruptedException(); } int s = state; // COMPLETING those tasks well, if (S > COMPLETING) {// COMPLETING those tasks well, = null) q.thread = null; Return s; } else if (s == COMPLETING) // If (COMPLETING) = COMPLETING, thread.yield (); Else if (q == null) // If (q = new WaitNode(); else if (! Queued) / / also in team, the CAS team queued = UNSAFE.com pareAndSwapObject (this, waitersOffset q.n ext = waiters, q); Else if (timed) {timed = deadline-system.nanotime (); If (nanos <= 0L) {removeWaiter(q); Return state; } locksupport. parkNanos(this, nanos); } else locksupport. park(this); // block thread}}Copy the code

At this point, thread arrangement task and get I will not be verbose ~~~~ also want a lot of exploration, after all, paid chat is more nervous, I will not repeat ~

The queue

Next we look at queues. In FutureTask, the implementation of a queue is a one-way linked list that represents the collection of all threads waiting for the task to complete. As we know, FutureTask implements the Future interface to retrieve the execution result of a “Task”, so what if the Task has not finished executing when the result is retrieved? The thread fetching the result is suspended in a wait queue until it is woken up after the task completes. This is similar to the Sync Queue in THE AQS, and you can compare the similarities and differences for yourself in the following analysis.

As we mentioned earlier, using queues in concurrent programming usually involves wrapping the current thread into some type of data structure and throwing it into a waiting queue. Let’s first look at the structure of each node in the queue:

static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); }}Copy the code

As you can see, this WaitNode is much simpler than the nodes in the bidirectional linked list used by AQS ‘Sync Queue, containing only a thread attribute and a Next attribute pointing to the next Node.

It’s important to note that the unidirectional linked list in FutureTask is used as a stack, specifically a Treiber stack. If you don’t know what a Treiber stack is, you can simply think of it as a thread-safe stack. It uses CAS to do the loading and unloading (see this article for more information).

The reason for using a thread-safe stack is that there may be multiple threads fetching the results of a task at the same time. If the task is still in progress, these threads will be wrapped as WaitNodes and thrown to the top of the Treiber stack. Therefore, CAS operations are required to ensure thread-safe loading and unloading.

Because the queue in FutureTask is essentially a Treiber stack, all you need to use the queue is a pointer to the top node, which in FutureTask’s case, loves the work:

/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
Copy the code

In fact, it is the head node of the entire one-way list.

To sum up, the queue structure used in FutureTask is as follows:

The CAS operation

Most CAS operations are used to change state, and FutureTask is no exception. We typically initialize offsets for properties that require CAS operations in static blocks:

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = FutureTask.class;
        stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));
        runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));
        waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
    } catch (Exception e) {
        throw new Error(e);
    }
}
Copy the code

As you can see from the static code block, THE CAS operation is mainly for three attributes, including State, Runner, and Waiters, indicating that these three attributes are basically accessed by multiple threads at the same time. The state attribute represents the state of the task, while the waiters attribute represents a pointer to a node at the top of the stack, which we have analyzed above.

The Runner attribute represents the thread that executes the “Task” in FutureTask. Why do you need a property to record the thread executing the task? This is in preparation for interrupting or canceling a task, which can only be interrupted if we know the thread executing the task.

After defining the offset of the attribute, the NEXT step is the CAS operation itself. In FutureTask, the CAS operation also ends up calling the compareAndSwapXXX method of the Unsafe class, which isn’t covered here because of the Unsafe code.

We practice

All the explanation without examples is playing rogue >>> ginger cut foam ~~ join the source of life… .

The actual combat project takes Springboot as the project scaffolding,github address:

Github.com/javastacks/…

1. MyFutureTask implementation class

Internal definition of a thread pool for task scheduling and thread management and thread reuse, we can configure according to their actual project situation

The thread scheduling example is as follows: Core thread 8 Maximum thread 20 Keep-life time 30 30s Storage queue 10 Daemon thread rejection policy: returns overloaded tasks to the caller

Description:

By default, the number of core threads (8) is used to execute the task. If the number of tasks exceeds the number of core threads, the task will be thrown into the queue. When the queue (10) is full, new threads will be started.

import com.boot.lea.mybot.dto.UserBehaviorDataDTO; import com.boot.lea.mybot.service.UserService; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.concurrent.*; /** * @author Lijing * @date 29 July 2019 */ @slf4j @Component Public class MyFutureTask {@resource UserService userService; /** * Core thread 8 Maximum thread 20 Keepalive time 30 30s Storage queue 10 Daemons reject policy: Roll back overloaded tasks to the caller */ private static ExecutorService executor = new ThreadPoolExecutor(8, 20, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10), new ThreadFactoryBuilder().setNameFormat("User_Async_FutureTask-%d").setDaemon(true).build(), new ThreadPoolExecutor.CallerRunsPolicy()); @SuppressWarnings("all") public UserBehaviorDataDTO getUserAggregatedResult(final Long userId) { System.out.println("MyFutureTask Thread :" + thread.currentThread ()); long fansCount = 0, msgCount = 0, collectCount = 0, followCount = 0, redBagCount = 0, couponCount = 0; // fansCount = userService.countFansCountByUserId(userId); // msgCount = userService.countMsgCountByUserId(userId); // collectCount = userService.countCollectCountByUserId(userId); // followCount = userService.countFollowCountByUserId(userId); // redBagCount = userService.countRedBagCountByUserId(userId); // couponCount = userService.countCouponCountByUserId(userId); try { Future<Long> fansCountFT = executor.submit(() -> userService.countFansCountByUserId(userId)); Future<Long> msgCountFT = executor.submit(() -> userService.countMsgCountByUserId(userId)); Future<Long> collectCountFT = executor.submit(() -> userService.countCollectCountByUserId(userId)); Future<Long> followCountFT = executor.submit(() -> userService.countFollowCountByUserId(userId)); Future<Long> redBagCountFT = executor.submit(() -> userService.countRedBagCountByUserId(userId)); Future<Long> couponCountFT = executor.submit(() -> userService.countCouponCountByUserId(userId)); //get blocks fansCount = fanscountft.get (); msgCount = msgCountFT.get(); collectCount = collectCountFT.get(); followCount = followCountFT.get(); redBagCount = redBagCountFT.get(); couponCount = couponCountFT.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); Log. The error (" > > > > > > aggregation query users aggregate information exception: "+ e +" < < < < < < < < < "); } UserBehaviorDataDTO userBehaviorData = UserBehaviorDataDTO.builder().fansCount(fansCount).msgCount(msgCount) .collectCount(collectCount).followCount(followCount) .redBagCount(redBagCount).couponCount(couponCount).build(); return userBehaviorData; }}Copy the code

2. Service service method

For the general business query method, we delay each method for special effects and to see the actual effect

import com.boot.lea.mybot.mapper.UserMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.concurrent.TimeUnit; @Service public class UserServiceImpl implements UserService { @Autowired UserMapper userMapper; @Override public long countFansCountByUserId(Long userId) { try { Thread.sleep(10000); System.out.println(" get FansCount=== sleep :" + 10 + "s"); } catch (InterruptedException e) { e.printStackTrace(); } system.out.println ("UserService obtains FansCount from Thread "+ thread.currentThread ().getName()); return 520; } @override public long countMsgCountByUserId(long userId) {system.out.println ("UserService obtains MsgCount from thread "+ Thread.currentThread().getName()); try { Thread.sleep(10000); System.out.println(" get MsgCount=== sleep :" + 10 + "s"); } catch (InterruptedException e) { e.printStackTrace(); } return 618; } @ Override public long countCollectCountByUserId (long userId) {System. Out. Println (" UserService obtain CollectCount threads "+ Thread.currentThread().getName()); try { Thread.sleep(10000); System.out.println(" Get CollectCount== sleep :" + 10 + "s"); } catch (InterruptedException e) { e.printStackTrace(); } return 6664; } @override public long countFollowCountByUserId(long userId) {system.out. println(UserService acquires the FollowCount thread + Thread.currentThread().getName()); try { Thread.sleep(10000); System.out.println(" Get FollowCount=== sleep :" + 10+ "s"); } catch (InterruptedException e) { e.printStackTrace(); } return userMapper.countFollowCountByUserId(userId); Override public long countRedBagCountByUserId(long userId) {system.out.println ("UserService obtains RedBagCount from thread "+ Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(4); System.out.println(" get RedBagCount=== sleep :" + 4 + "s"); } catch (InterruptedException e) { e.printStackTrace(); } return 99; } @override public long countCouponCountByUserId(long userId) {system.out.println ("UserService obtains the CouponCount thread "+ Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(8); System.out.println(" Get CouponCount=== sleep :" + 8+ "s"); } catch (InterruptedException e) { e.printStackTrace(); } return 66; }}Copy the code

3. The controller calls

/** * @author LiJing * @ClassName: UserController * @Description: User controller * @date 2019/7/29 15:16 */ @restController @requestMapping ("user/") Public class UserController {@autoWired private UserService userService; @Autowired private MyFutureTask myFutureTask; @getMapping ("/index") @responseBody public String index() {return "~~~~~~~~"; } //http://localhost:8080/api/user/get/data? userId=4 @GetMapping("/get/data") @ResponseBody public UserBehaviorDataDTO getUserData(Long userId) { System.out.println("UserController Thread :" + thread.currentThread ()); long begin = System.currentTimeMillis(); UserBehaviorDataDTO userAggregatedResult = myFutureTask.getUserAggregatedResult(userId); long end = System.currentTimeMillis(); System. The out. Println (" = = = = = = = = = = = = = = = total time: "+ (end - the begin) / 1000.0000 +" seconds "); return userAggregatedResult; }}Copy the code

We start the project: open calls to http://localhost:8080/api/user/get/data? userId=4

When our thread pool is configured as: core thread 8 Max thread 20 keepalive time 30 seconds storage queue 10, our test results are as follows:

Result: We see that each server Method’s thread of execution is the thread name originated from the thread pool :User_Async_FutureTask-%d, and the total elapsed time is reduced from a cumulative 52 seconds to 10 seconds, depending on the most time-consuming method query time.

Let’s uncomment the code and test it with a serial query:

Results: We ran the query in serial mode, and the total result was 52 seconds, which was terrible

conclusion

When FutureTask is used, the task runner is called back as caller to block the acquisition. Finally, we summarize the results, that is, we complete the business method of enabling multi-thread asynchronous call.

Future<Long> fansCountFT = executor.submit(new Callable<Long>() {
    @Override
    public Long call() throws Exception {
        return userService.countFansCountByUserId(userId);
    }
});
Copy the code

Used here is a simple example, specific projects can define specific business methods to merge processing, after JDK1.8, actually had another ExecutorCompletionService, ForkJoinTask, CompletableFuture which can realize the above method , we will do some cases of the use of these methods in the future, we look forward to your attention, there are shortcomings in the article, welcome to correct ~

Small dessert

So, we should use dear Spring’s asynchronous programming, there are many kinds of asynchronous programming, such as common Future sync, CompletableFuture. SupplyAsync, @ Async, ha ha Actually, cannot leave the Thread. The start ()… Wait for me to make a joke:

Dad has two children, Xiao Hong and Xiao Ming. Dad wanted to drink, he asked Xiao Hong to buy wine, xiao Hong went out. Then dad suddenly wanted to smoke, so dad asked Xiao Ming to buy cigarettes. In the thought of facing objects, people generally regard buying things and then buying them back as a method. If they are structured in sequence or synchronized by multi-threading, Xiao Ming must wait for Xiao Hong to finish buying cigarettes. This definitely adds to the cost of time (what if dad has a full bladder?). . Asynchrony is designed to solve such problems. You can give instructions to xiao Hong xiao Ming, let them go to buy something, and then you can do your own thing, wait for them to buy back to receive the results can be.

package com.boot.lea.mybot.futrue; /** * @ClassName: TestFuture * @Description: * @author LiJing * @date 2019/8/5 15:16 */ @suppressWarnings ("all") public class TestFuture {static ExecutorService executor = Executors.newFixedThreadPool(2); Public static void main(String[] args) throws InterruptedException {// The thread pool for two threads InterruptedException Return the operating results of the little red shopping CompletableFuture < String > future2. = CompletableFuture supplyAsync (() - > {System. Out. Println (" dad: little red you go to buy a bottle of wine!" ); Try {system.out. println("小红 went out to buy wine, girl ran slowly, estimated 5s will come back..." ); Thread.sleep(5000); Return "I bought it!" ; } catch (InterruptedException e) {system.err.println (InterruptedException e) {system.err.println (InterruptedException e); "See you in the afterlife!" ; } }, executor); // Xiao Ming buys cigarettes. Future1 here represents what will happen when Xiao Ming buys things in the future. The return value is the result of xiao Ming shopping CompletableFuture < String > future1. = CompletableFuture supplyAsync (() - > {System. Out. Println (" dad: xiao Ming to buy packs of cigarettes you!" ); Try {system.out.println () {system.out.println () {system.out.println () {system.out.println (); ); Thread.sleep(3000); throw new InterruptedException(); // return "I bought it!" ; } catch (InterruptedException e) {system.out.println (InterruptedException e) {system.out.println ( ); "This is a message from me. I am no longer here." ; } }, executor); ThenAccept ((e) -> {system.out.println (" + e "); }); Future1. thenAccept((e) -> {system.out.println (" x: "+ e); }); System.out.println(" Dad: Wait for the beautiful scenery of west Lake in March!......" ); Println (" Dad: I got bored and even went to the bathroom." ); Thread.currentThread().join(9 * 1000); System.out.println(" dad: finally bought me...... Re-arrest -- wine "); // Close the thread pool executor.shutdown(); }}Copy the code

Running results:

Recent hot articles recommended:

1.1,000+ Java Interview Questions and Answers (2021)

2. Don’t use if/ else on full screen again, try strategy mode, it smells good!!

3. Oh, my gosh! What new syntax is xx ≠ null in Java?

4.Spring Boot 2.6 is out with a lot of new features.

5. “Java Development Manual (Songshan version)” the latest release, quick download!

Feel good, don’t forget to click on + forward oh!