Mainly based on xiaomi’s recent multithreading project, extracted from the inside of the multithreading instance.
Yesterday, I was very shocked to hear the news of the death of Yuan Lao. I hope yuan Lao can be well in heaven. I posted a photo of Yuan Lao, which also reminds me of my grandfather who has been in heaven.
preface
Java multithreaded learning, also has more than half a month, from the beginning of learning Java multithreaded, set a small goal for themselves, hope to write a multithreaded Demo, today is mainly to cash in this small goal.
The multithreading example, it is combined with the recent millet a multi-threaded asynchronous task projects, I put the abstract involves multithreaded code inside, and then some renovation, was not rewrite a on the one hand, his ability is not enough, on the other hand is to want to learn now projects multi-threading implementation posture, At least this example applies to a real project. Learn how others build a wheel, and you will know how to build your own wheel.
The business requirements
The main reason we’re doing this multithreaded asynchronous task is because we have a lot of asynchronous tasks that are permanently moving. What is permanently moving? For example, the message Push task needs to consume the unpushed messages in DB all the time because there are always messages coming, so it needs to be a permanent asynchronous Push task.
Our needs are actually not difficult, simple summary:
- The ability to execute multiple permanently moving asynchronous tasks simultaneously;
- Each asynchronous task, support to open multiple threads to consume the task data;
- Elegant shutdown of a permanently moving asynchronous task is supported. That is, all data needs to be consumed before shutdown.
To complete the above requirements, we need to pay attention to several points:
- Each permanent task, can open a thread to execute;
- Each subtask, because it needs to support concurrency, needs to be controlled by thread pools;
- The closure of a permanent task requires notifying the concurrent threads of the subtask, and supports elegant closure of both permanent and concurrent subtasks.
Project example
The thread pool
For subtasks, we need to support concurrency. If we open one thread for each concurrent task and close it when it is finished, it will consume too much resources, so we introduce thread pools:
public class TaskProcessUtil {
Each task has its own thread pool
private static Map<String, ExecutorService> executors = new ConcurrentHashMap<>();
// Initialize a thread pool
private static ExecutorService init(String poolName, int poolSize) {
return new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new ThreadFactoryBuilder().setNameFormat("Pool-" + poolName).setDaemon(false).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
// Get the thread pool
public static ExecutorService getOrInitExecutors(String poolName,int poolSize) {
ExecutorService executorService = executors.get(poolName);
if (null == executorService) {
synchronized (TaskProcessUtil.class) {
executorService = executors.get(poolName);
if (null == executorService) {
executorService = init(poolName, poolSize);
executors.put(poolName, executorService);
}
}
}
return executorService;
}
// Reclaim thread resources
public static void releaseExecutors(String poolName) {
ExecutorService executorService = executors.remove(poolName);
if(executorService ! =null) {
executorService.shutdown();
}
}
}
Copy the code
This is a utility class for thread pools. It is easy to initialize a thread pool and reclaim thread resources. We are mainly talking about getting a thread pool. Obtaining thread pools can be concurrent, so you need to add a synchronized lock, and then, once locked, you need to perform a second null check on the executorService, much like the implementation of Java singletons, as described in this article “Design Patterns Series 5: Singletons.”
A single task
In order to better explain the implementation of a single task, our main task is to print the data of Cat. Cat is defined as follows:
@Data
@Service
public class Cat {
private String catName;
public Cat setCatName(String name) {
this.catName = name;
return this;
}
}
Copy the code
A single task has the following functions:
- QueryData () queryData() queryData()
- Multithreaded execution: Data needs to be split into 4 pieces and then executed concurrently by multiple threads, which can be supported by thread pools.
- Graceful shutdown of a permanently moving task: When a task is notified that it needs to be shut down, the remaining task data needs to be completed, and the thread resources are recovered to exit the task.
- Permanent execution: Tasks need to be executed continuously if no shutdown command is received.
Look directly at the code:
public class ChildTask {
private final int POOL_SIZE = 3; // Thread pool size
private final int SPLIT_SIZE = 4; // Data split size
private String taskName;
// Receive the SHUTDOWN signal of the JVM to achieve elegant shutdown
protected volatile boolean terminal = false;
public ChildTask(String taskName) {
this.taskName = taskName;
}
// Program execution entry
public void doExecute(a) {
int i = 0;
while(true) {
System.out.println(taskName + ":Cycle-" + i + "-Begin");
// Get the data
List<Cat> datas = queryData();
// Process the data
taskExecute(datas);
System.out.println(taskName + ":Cycle-" + i + "-End");
if (terminal) {
// Only when the application is closed, it will come here for elegant offline
break;
}
i++;
}
// Reclaim thread pool resources
TaskProcessUtil.releaseExecutors(taskName);
}
// Stop gracefully
public void terminal(a) {
/ / off the phone
terminal = true;
System.out.println(taskName + " shut down");
}
// Process the data
private void doProcessData(List<Cat> datas, CountDownLatch latch) {
try {
for (Cat cat : datas) {
System.out.println(taskName + ":" + cat.toString() + ",ThreadName:" + Thread.currentThread().getName());
Thread.sleep(1000L);
}
} catch (Exception e) {
System.out.println(e.getStackTrace());
} finally {
if(latch ! =null) {
latch.countDown();
}
}
}
// Process individual task data
private void taskExecute(List<Cat> sourceDatas) {
if (CollectionUtils.isEmpty(sourceDatas)) {
return;
}
// Split the data into four pieces
List<List<Cat>> splitDatas = Lists.partition(sourceDatas, SPLIT_SIZE);
final CountDownLatch latch = new CountDownLatch(splitDatas.size());
// Process split data concurrently, sharing a thread pool
for (final List<Cat> datas : splitDatas) {
ExecutorService executorService = TaskProcessUtil.getOrInitExecutors(taskName, POOL_SIZE);
executorService.submit(new Runnable() {
@Override
public void run(a) {
doProcessData(datas, latch);
}
});
}
try {
latch.await();
} catch (Exception e) {
System.out.println(e.getStackTrace());
}
}
// Obtain the permanent task data
private List<Cat> queryData(a) {
List<Cat> datas = new ArrayList<>();
for (int i = 0; i < 5; i ++) {
datas.add(new Cat().setCatName("Luo Xiaohei" + i));
}
return datas;
}
}
Copy the code
A quick explanation:
- QueryData: Used to obtain data, the actual application is actually need to queryData as abstract methods, and then by each task to implement their own methods.
- DoProcessData: Data processing logic. In practice, you need to make doProcessData abstract and let each task implement its own methods.
- TaskExecute: Divides the data into four pieces, gets a pool of threads for the task, gives it to the pool for concurrent execution, and blocks via latch.await(). This method returns only when all four copies of data are successfully executed and blocking is complete.
- Terminal: this variable is defined as volatile, so multithreaded memory can be seen. See Java Concurrent Programming Series 2 for details on volatile.
- DoExecute: program execution entry, encapsulating the execution process of each task. When terminal=true, the task data is executed first, the thread pool is retracted, and the terminal exits.
Task entry
Go directly to the code:
public class LoopTask {
private List<ChildTask> childTasks;
public void initLoopTask(a) {
childTasks = new ArrayList();
childTasks.add(new ChildTask("childTask1"));
childTasks.add(new ChildTask("childTask2"));
for (final ChildTask childTask : childTasks) {
new Thread(new Runnable() {
@Override
public void run(a) {
childTask.doExecute();
}
}).start();
}
}
public void shutdownLoopTask(a) {
if(! CollectionUtils.isEmpty(childTasks)) {
for (ChildTask childTask : childTasks) {
childTask.terminal();
}
}
}
public static void main(String args[]) throws Exception{
LoopTask loopTask = new LoopTask();
loopTask.initLoopTask();
Thread.sleep(5000L);
loopTask.shutdownLoopTask();
}
}
Copy the code
Each task opens a separate Thread. Here I initialize two permanent tasks, childTask1 and childTask2, and execute them separately. After five seconds of Sleep, I close the task, and we can see if we can exit gracefully as expected.
Results analysis
The execution result is as follows:
childTask1:Cycle-0-Begin
childTask2:Cycle-0-Begin
ChildTask1: Cat (catName = black0),ThreadName:Pool-childTask1
ChildTask1: Cat (catName = black4),ThreadName:Pool-childTask1
ChildTask2: Cat (catName = black4),ThreadName:Pool-childTask2
ChildTask2: Cat (catName = black0),ThreadName:Pool-childTask2
ChildTask1: Cat (catName = black1),ThreadName:Pool-childTask1
ChildTask2: Cat (catName = black1),ThreadName:Pool-childTask2
ChildTask2: Cat (catName = black2),ThreadName:Pool-childTask2
ChildTask1: Cat (catName = black2),ThreadName:Pool-childTask1
ChildTask2: Cat (catName = black3),ThreadName:Pool-childTask2
ChildTask1: Cat (catName = black3),ThreadName:Pool-childTask1
childTask2:Cycle-0-End
childTask2:Cycle-1-Begin
childTask1:Cycle-0-End
childTask1:Cycle-1-Begin
ChildTask2: Cat (catName = black0),ThreadName:Pool-childTask2
ChildTask2: Cat (catName = black4),ThreadName:Pool-childTask2
ChildTask1: Cat (catName = black4),ThreadName:Pool-childTask1
ChildTask1: Cat (catName = black0),ThreadName:Pool-childTask1
childTask1 shut down
childTask2 shut down
ChildTask2: Cat (catName = black1),ThreadName:Pool-childTask2
ChildTask1: Cat (catName = black1),ThreadName:Pool-childTask1
ChildTask1: Cat (catName = black2),ThreadName:Pool-childTask1
ChildTask2: Cat (catName = black2),ThreadName:Pool-childTask2
ChildTask1: Cat (catName = black3),ThreadName:Pool-childTask1
ChildTask2: Cat (catName = black3),ThreadName:Pool-childTask2
childTask1:Cycle-1-End
childTask2:Cycle-1-End
Copy the code
In the output data, “Pool-childTask” is the thread Pool name, “childTask” is the task name, “Cat(catName= luo Xiaohei)” is the result of execution, “childTask shut down” is the close flag, “ChildTask: cycle-x-begin” and “childTask: cycle-x-end” are the start and End markers for each Cycle.
Let’s analyze the results: ChildTask1 and childTask2 are executed respectively. In the first round of the cycle, they both output 5 pieces of Luo Xiaohei data normally. In the second round of execution, I started the close instruction.
conclusion
This is actually a classic example of thread pool use, which was written by a colleague in our company. I feel that the whole process has no problems and the implementation is very elegant, which is worth learning.
Then in the process of learning Java multithreading, I feel my current master speed is relatively fast, from the Java memory model, to the basic knowledge of Java multithreading and commonly used tools, to the final multithreading actual practice, a total of 8 articles, really can let you from the Java small white to write a more robust multithreading program.
Actually before learning a language or technology, more is to see some teachers, especially to see actually, more important is their own practice, need to write more, so before a lot of articles is pure theory, now more is combining theory and actual combat, even see some examples of online, I will Copy down and let the program run to ease again.
Java multithreading part, I plan to write another 1-2 articles later, this series will be suspended, because my goal is to learn all the Java ecology related technologies, so I eat as soon as possible, and after learning all, then focus on more in-depth knowledge.
More articles, please pay attention to the wechat public number “Lou Zai advanced road”, point attention, do not get lost ~~