Mainly based on xiaomi’s recent multithreading project, extracted from the inside of the multithreading instance.

Yesterday, I was very shocked to hear the news of the death of Yuan Lao. I hope yuan Lao can be well in heaven. I posted a photo of Yuan Lao, which also reminds me of my grandfather who has been in heaven.

preface

Java multithreaded learning, also has more than half a month, from the beginning of learning Java multithreaded, set a small goal for themselves, hope to write a multithreaded Demo, today is mainly to cash in this small goal.

The multithreading example, it is combined with the recent millet a multi-threaded asynchronous task projects, I put the abstract involves multithreaded code inside, and then some renovation, was not rewrite a on the one hand, his ability is not enough, on the other hand is to want to learn now projects multi-threading implementation posture, At least this example applies to a real project. Learn how others build a wheel, and you will know how to build your own wheel.

The business requirements

The main reason we’re doing this multithreaded asynchronous task is because we have a lot of asynchronous tasks that are permanently moving. What is permanently moving? For example, the message Push task needs to consume the unpushed messages in DB all the time because there are always messages coming, so it needs to be a permanent asynchronous Push task.

Our needs are actually not difficult, simple summary:

  1. The ability to execute multiple permanently moving asynchronous tasks simultaneously;
  2. Each asynchronous task, support to open multiple threads to consume the task data;
  3. Elegant shutdown of a permanently moving asynchronous task is supported. That is, all data needs to be consumed before shutdown.

To complete the above requirements, we need to pay attention to several points:

  1. Each permanent task, can open a thread to execute;
  2. Each subtask, because it needs to support concurrency, needs to be controlled by thread pools;
  3. The closure of a permanent task requires notifying the concurrent threads of the subtask, and supports elegant closure of both permanent and concurrent subtasks.

Project example

The thread pool

For subtasks, we need to support concurrency. If we open one thread for each concurrent task and close it when it is finished, it will consume too much resources, so we introduce thread pools:

public class TaskProcessUtil {

    Each task has its own thread pool

    private static Map<String, ExecutorService> executors = new ConcurrentHashMap<>();



    // Initialize a thread pool

    private static ExecutorService init(String poolName, int poolSize) {

        return new ThreadPoolExecutor(poolSize, poolSize,

                0L, TimeUnit.MILLISECONDS,

                new LinkedBlockingQueue<Runnable>(),

                new ThreadFactoryBuilder().setNameFormat("Pool-" + poolName).setDaemon(false).build(),

                new ThreadPoolExecutor.CallerRunsPolicy());

    }



    // Get the thread pool

    public static ExecutorService getOrInitExecutors(String poolName,int poolSize) {

        ExecutorService executorService = executors.get(poolName);

        if (null == executorService) {

            synchronized (TaskProcessUtil.class{

                executorService = executors.get(poolName);

                if (null == executorService) {

                    executorService = init(poolName, poolSize);

                    executors.put(poolName, executorService);

                }

            }

        }

        return executorService;

    }



    // Reclaim thread resources

    public static void releaseExecutors(String poolName) {

        ExecutorService executorService = executors.remove(poolName);

        if(executorService ! =null) {

            executorService.shutdown();

        }

    }

}

Copy the code

This is a utility class for thread pools. It is easy to initialize a thread pool and reclaim thread resources. We are mainly talking about getting a thread pool. Obtaining thread pools can be concurrent, so you need to add a synchronized lock, and then, once locked, you need to perform a second null check on the executorService, much like the implementation of Java singletons, as described in this article “Design Patterns Series 5: Singletons.”

A single task

In order to better explain the implementation of a single task, our main task is to print the data of Cat. Cat is defined as follows:

@Data

@Service

public class Cat {

    private String catName;

    public Cat setCatName(String name) {

        this.catName = name;

        return this;

    }

}

Copy the code

A single task has the following functions:

  • QueryData () queryData() queryData()
  • Multithreaded execution: Data needs to be split into 4 pieces and then executed concurrently by multiple threads, which can be supported by thread pools.
  • Graceful shutdown of a permanently moving task: When a task is notified that it needs to be shut down, the remaining task data needs to be completed, and the thread resources are recovered to exit the task.
  • Permanent execution: Tasks need to be executed continuously if no shutdown command is received.

Look directly at the code:

public class ChildTask {



    private final int POOL_SIZE = 3// Thread pool size

    private final int SPLIT_SIZE = 4// Data split size

    private String taskName;



    // Receive the SHUTDOWN signal of the JVM to achieve elegant shutdown

    protected volatile boolean terminal = false;



    public ChildTask(String taskName) {

        this.taskName = taskName;

    }



    // Program execution entry

    public void doExecute(a) {

        int i = 0;

        while(true) {

            System.out.println(taskName + ":Cycle-" + i + "-Begin");

            // Get the data

            List<Cat> datas = queryData();

            // Process the data

            taskExecute(datas);

            System.out.println(taskName + ":Cycle-" + i + "-End");

            if (terminal) {

                // Only when the application is closed, it will come here for elegant offline

                break;

            }

            i++;

        }

        // Reclaim thread pool resources

        TaskProcessUtil.releaseExecutors(taskName);

    }



    // Stop gracefully

    public void terminal(a) {

        / / off the phone

        terminal = true;

        System.out.println(taskName + " shut down");

    }



    // Process the data

    private void doProcessData(List<Cat> datas, CountDownLatch latch) {

        try {

            for (Cat cat : datas) {

                System.out.println(taskName + ":" + cat.toString() + ",ThreadName:" + Thread.currentThread().getName());

                Thread.sleep(1000L);

            }

        } catch (Exception e) {

            System.out.println(e.getStackTrace());

        } finally {

            if(latch ! =null) {

                latch.countDown();

            }

        }

    }



    // Process individual task data

    private void taskExecute(List<Cat> sourceDatas) {

        if (CollectionUtils.isEmpty(sourceDatas)) {

            return;

        }

        // Split the data into four pieces

        List<List<Cat>> splitDatas = Lists.partition(sourceDatas, SPLIT_SIZE);

        final CountDownLatch latch = new CountDownLatch(splitDatas.size());



        // Process split data concurrently, sharing a thread pool

        for (final List<Cat> datas : splitDatas) {

            ExecutorService executorService = TaskProcessUtil.getOrInitExecutors(taskName, POOL_SIZE);

            executorService.submit(new Runnable() {

                @Override

                public void run(a) {

                    doProcessData(datas, latch);

                }

            });

        }



        try {

            latch.await();

        } catch (Exception e) {

            System.out.println(e.getStackTrace());

        }

    }



    // Obtain the permanent task data

    private List<Cat> queryData(a) {

        List<Cat> datas = new ArrayList<>();

        for (int i = 0; i < 5; i ++) {

            datas.add(new Cat().setCatName("Luo Xiaohei" + i));

        }

        return datas;

    }

}

Copy the code

A quick explanation:

  • QueryData: Used to obtain data, the actual application is actually need to queryData as abstract methods, and then by each task to implement their own methods.
  • DoProcessData: Data processing logic. In practice, you need to make doProcessData abstract and let each task implement its own methods.
  • TaskExecute: Divides the data into four pieces, gets a pool of threads for the task, gives it to the pool for concurrent execution, and blocks via latch.await(). This method returns only when all four copies of data are successfully executed and blocking is complete.
  • Terminal: this variable is defined as volatile, so multithreaded memory can be seen. See Java Concurrent Programming Series 2 for details on volatile.
  • DoExecute: program execution entry, encapsulating the execution process of each task. When terminal=true, the task data is executed first, the thread pool is retracted, and the terminal exits.

Task entry

Go directly to the code:

public class LoopTask {

    private List<ChildTask> childTasks;

    public void initLoopTask(a) {

        childTasks = new ArrayList();

        childTasks.add(new ChildTask("childTask1"));

        childTasks.add(new ChildTask("childTask2"));

        for (final ChildTask childTask : childTasks) {

            new Thread(new Runnable() {

                @Override

                public void run(a) {

                    childTask.doExecute();

                }

            }).start();

        }

    }

    public void shutdownLoopTask(a) {

        if(! CollectionUtils.isEmpty(childTasks)) {

            for (ChildTask childTask : childTasks) {

                childTask.terminal();

            }

        }

    }

    public static void main(String args[]) throws Exception{

        LoopTask loopTask = new LoopTask();

        loopTask.initLoopTask();

        Thread.sleep(5000L);

        loopTask.shutdownLoopTask();

    }

}

Copy the code

Each task opens a separate Thread. Here I initialize two permanent tasks, childTask1 and childTask2, and execute them separately. After five seconds of Sleep, I close the task, and we can see if we can exit gracefully as expected.

Results analysis

The execution result is as follows:

childTask1:Cycle-0-Begin

childTask2:Cycle-0-Begin

ChildTask1: Cat (catName = black0),ThreadName:Pool-childTask1

ChildTask1: Cat (catName = black4),ThreadName:Pool-childTask1

ChildTask2: Cat (catName = black4),ThreadName:Pool-childTask2

ChildTask2: Cat (catName = black0),ThreadName:Pool-childTask2

ChildTask1: Cat (catName = black1),ThreadName:Pool-childTask1

ChildTask2: Cat (catName = black1),ThreadName:Pool-childTask2

ChildTask2: Cat (catName = black2),ThreadName:Pool-childTask2

ChildTask1: Cat (catName = black2),ThreadName:Pool-childTask1

ChildTask2: Cat (catName = black3),ThreadName:Pool-childTask2

ChildTask1: Cat (catName = black3),ThreadName:Pool-childTask1

childTask2:Cycle-0-End

childTask2:Cycle-1-Begin

childTask1:Cycle-0-End

childTask1:Cycle-1-Begin

ChildTask2: Cat (catName = black0),ThreadName:Pool-childTask2

ChildTask2: Cat (catName = black4),ThreadName:Pool-childTask2

ChildTask1: Cat (catName = black4),ThreadName:Pool-childTask1

ChildTask1: Cat (catName = black0),ThreadName:Pool-childTask1

childTask1 shut down

childTask2 shut down

ChildTask2: Cat (catName = black1),ThreadName:Pool-childTask2

ChildTask1: Cat (catName = black1),ThreadName:Pool-childTask1

ChildTask1: Cat (catName = black2),ThreadName:Pool-childTask1

ChildTask2: Cat (catName = black2),ThreadName:Pool-childTask2

ChildTask1: Cat (catName = black3),ThreadName:Pool-childTask1

ChildTask2: Cat (catName = black3),ThreadName:Pool-childTask2

childTask1:Cycle-1-End

childTask2:Cycle-1-End

Copy the code

In the output data, “Pool-childTask” is the thread Pool name, “childTask” is the task name, “Cat(catName= luo Xiaohei)” is the result of execution, “childTask shut down” is the close flag, “ChildTask: cycle-x-begin” and “childTask: cycle-x-end” are the start and End markers for each Cycle.

Let’s analyze the results: ChildTask1 and childTask2 are executed respectively. In the first round of the cycle, they both output 5 pieces of Luo Xiaohei data normally. In the second round of execution, I started the close instruction.

conclusion

This is actually a classic example of thread pool use, which was written by a colleague in our company. I feel that the whole process has no problems and the implementation is very elegant, which is worth learning.

Then in the process of learning Java multithreading, I feel my current master speed is relatively fast, from the Java memory model, to the basic knowledge of Java multithreading and commonly used tools, to the final multithreading actual practice, a total of 8 articles, really can let you from the Java small white to write a more robust multithreading program.

Actually before learning a language or technology, more is to see some teachers, especially to see actually, more important is their own practice, need to write more, so before a lot of articles is pure theory, now more is combining theory and actual combat, even see some examples of online, I will Copy down and let the program run to ease again.

Java multithreading part, I plan to write another 1-2 articles later, this series will be suspended, because my goal is to learn all the Java ecology related technologies, so I eat as soon as possible, and after learning all, then focus on more in-depth knowledge.

More articles, please pay attention to the wechat public number “Lou Zai advanced road”, point attention, do not get lost ~~