preface


Through the study of the first two articles, we have solved the queue and thread in Libuv, to lay a foundation for the study of this article, those who have not seen it are recommended to read it first. The following will study the thread pool of Libuv from the perspective of producer consumer model and source code, paving the way for learning Libuv file processing.


Producer-consumer model


Node.js supports synchronous and asynchronous file operations. According to the introduction of Libuv’s official website, we know that it does not have cross-platform asynchronous file IO to use, so its asynchronous file IO is implemented by executing synchronous file IO in the thread pool. So how does that work? The answer is the producer-consumer model. The Libuv thread consists of two parts, one is the main thread and the other is the thread pool. Part of the main thread’s job is to describe the task and submit it to the thread pool, which processes it. Take the asynchronous file operation as an example. The main thread generates an object describing the file operation and submits it to the task queue. The thread pool retrieves the object from the task queue for processing. The main thread is the producer, the thread in the thread pool is the consumer, and the task queue is the bridge between the producer and the consumer. Here is a simple schematic diagram:

Libuv adds a step to the producer-consumer model, in which the thread pool executes a task and hands the result to the main thread, which gets the result and executes it if it sees a callback that needs to be executed. So Libuv’s threading model is as follows:


Source code analysis


The code for the Libuv threadpool is easy to find in the threadpool.c file in the SRC directory.



According to the introduction of producer-consumer model above, this code can be roughly divided into four parts: task queue, main thread submits tasks to task queue (submit tasks), thread pool obtains tasks from task queue and executes (consume tasks), thread pool notifts main thread to execute callback function (callback processing) after executing tasks.


Task queue


A task queue is just a queue. Because the task queue can be accessed by multiple threads (main thread, thread pool) at the same time, mutex is required to ensure thread safety. In addition, if the task queue is empty, the threads in the thread pool need to be suspended, waiting for the main thread to wake up after submitting the task, so the condition variable is also required. Task queues, condition variables, and mutex are defined as follows:


1... 2static uv_cond_t cond; // condition variable 3static uv_mutex_t mutex; // Mutex 4... 5static QUEUE wq; // Queue 6...Copy the code


Submit a task


The main thread submits tasks to the queue via uv__work_submit. Let’s look at the code:


 1struct uv__work {
 2  void (*work)(struct uv__work *w);
 3  void (*done)(struct uv__work *w, int status); 4 struct uv_loop_s* loop; 5 void* wq[2]; // used to associate it with the task queue 6}; 7 8void uv__work_submit(uv_loop_t* loop, 9 struct uv__work* w, 10 enum uv__work_kind kind, 11 void (*work)(struct uv__work* w), 12 void (*done)(struct uv__work* w, int status)) { 13 uv_once(&once, init_once); 14 w->loop = loop; W ->work = work; // thread pool to execute the function 16 w->done = done; Post (&w->wq, kind); // Put the task in the queue 18}Copy the code


Uv__work_submit has 4 parameters: the first parameter is the Libuv event loop, which we will ignore for the moment, there will be a special article on; The second parameter is a generic model of the task to be executed by the thread pool. The type is uv__work. The work attribute indicates the function to be executed in the thread pool. The done attribute indicates the function to be executed by the thread pool. The third and fourth arguments correspond to the work and done functions, respectively. This function does two things: one is to initialize the thread pool by calling init_once through uv_once; The other is to assign w and post it to the task queue. Note that nv_once ensures that uv__work_submit is called multiple times and init_once is executed only once. Nv_once is implemented via pthread_once. Init_once is covered in the next section, but let’s look at POST first.


1static void post(QUEUE* q, enum uv__work_kind kind) {2 static void post(QUEUE* q, enum uv__work_kind kind) {3 uv_mutex_lock(&mutex); 4... QUEUE_INSERT_TAIL(&wq, q); 7 8 // If there are suspended threads in the thread pool, the suspended thread is aroused and let work 9if(idle_threads > 0) 10 uv_cond_signal(&cond); Uv_mutex_unlock (&mutex); 13}Copy the code


The code is simple: acquire the lock MUtex, and then submit the task to the task queue. If there are pending threads in the thread pool, the lock mutex is invoked and abandoned by the condition variable cond.


Consuming task


Tasks in the task queue are consumed by the thread pool, which is initialized by the uv__work_submit call to init_once.


1static void init_once(void) {
2  ...
3  init_threads();
4}Copy the code


Init_once calls init_threads, so look at init_threads.


 1...
 2#define MAX_THREADPOOL_SIZE 1024 // The maximum number of thread pools3... 4static uv_thread_t* threads; Static uv_thread_t default_threads[4]; // The default thread pool is 4, 6... 7 8 9static void init_threads(void) { 10 unsigned int i; 11 const char* val; 12... MAX_THREADPOOL_SIZE 15 nthreads = ARRAY_SIZE(default_threads); 18 val = getenv("UV_THREADPOOL_SIZE");
19  if(val ! = NULL) 20 nthreads = atoi(val); 21 22 // Save at least one thread in the thread pool 23if(nthreads == 0) 24 nthreads = 1; 25 26 The number of threads in the thread pool cannot exceed MAX_THREADPOOL_SIZE 27if(nthreads > MAX_THREADPOOL_SIZE) 28 nthreads = MAX_THREADPOOL_SIZE; 31 threads = default_threads; 32if (nthreads > ARRAY_SIZE(default_threads)) {
33    threads = uv__malloc(nthreads * sizeof(threads[0]));
34    if(threads == NULL) { 35 nthreads = ARRAY_SIZE(default_threads); 36 threads = default_threads; 40 // Create condition variable 41if(uv_cond_init(&cond)) 42 abort(); 43 44 // Create a mutex 45if(uv_mutex_init(&mutex)) 46 abort(); 49 QUEUE_INIT(&wq); 50... 51 52 // Initializes each thread in the thread pool based on the number of thread pools and executes worker function 53for (i = 0; i < nthreads; i++)
54    if(uv_thread_create(threads + i, worker, &sem)) 55 abort(); 56 57... 58}Copy the code


Init_threads gets the size of the thread pool nTHREADS; Then initialize the mutex, condition variable cond, and task queue wq. Finally, create nThreads threads, each of which executes the worker function. The core of the worker function is the task in the consumption task queue. Let’s take a look at it in detail:


1static void worker(void* arg) { 2 struct uv__work* w; 3 QUEUE* q; 4, 5... 6 arg = NULL; 9 uv_mutex_lock(&mutex); 10 11 // Ensure that the thread keeps executing 12 through an infinite loopfor(;;) {13 14 // If the task queue is empty, wait for the condition variable cond to hang and release the lock mutex 15 // The main thread commits the task by uv_cond_signal and reacquires the lock mutex 16while(QUEUE_EMPTY(&wq) || ...) { 17 idle_threads += 1; 18 uv_cond_wait(&cond, &mutex); 19 idle_threads -= 1; Q = QUEUE_HEAD(&wq); 24... 27 QUEUE_REMOVE(q); 27 QUEUE_REMOVE(q); 28 QUEUE_INIT(q); 29 30... 33 uv_mutex_unlock(&mutex); W = QUEUE_DATA(q, struct uv__work, wq); 37 w->work(w); Wq_mutex 40 uv_mutex_lock(&w->loop->wq_mutex); 41 w->work = NULL; 44 QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); 45 46 // Notify the main thread through uv_async_send that a task has completed and the main thread can execute the taskdoneFunction. 47 uv_async_send(&w->loop->wq_async); 48 uv_mutex_unlock(&w->loop->wq_mutex); 49 50 // Obtain the lock and execute the next task in the task queue 51... 52 uv_mutex_lock(&mutex); 53... 55 54}}Copy the code


The essence of a worker is to fetch a task from a task queue and then execute the work function. After execution, the task is submitted to the WP queue of the event loop, and the main thread is told to execute the done function of the task through uv_asynC_send.


The callback processing


The worker function uses uv_async_send to tell the main thread to execute the callback function after completing the task. How is this implemented? The event loop is involved here, and we will briefly introduce it here, and there will be more detailed articles about it later. The event loop calls uv_async_init when initialized. The third argument to this function is a function that is executed when uv_async_send is called by another thread. The specific code is as follows:


1uv_async_init(loop, &loop->wq_async, uv__work_done); 2 3void uv__work_done(uv_async_t* handle) { 4 struct uv__work* w; 5 uv_loop_t* loop; 6 QUEUE* q; 7 QUEUE wq; 8 int err; 9 10 loop = container_of(handle, uv_loop_t, wq_async); 11 uv_mutex_lock(&loop->wq_mutex); 12 QUEUE_MOVE(&loop->wq, &wq); 13 uv_mutex_unlock(&loop->wq_mutex); 14 and 15while(! QUEUE_EMPTY(&wq)) { 16 q = QUEUE_HEAD(&wq); 17 QUEUE_REMOVE(q); 18 19 w = container_of(q, struct uv__work, wq); 20 err = (w->work == uv__cancelled) ? UV_ECANCELED : 0; 21 w->done(w, err); 23 22}}Copy the code


Uv__work_done is simple, get the wq queue in the loop, get each task in the queue and call the done function.


conclusion


This article first introduces the producer-consumer model, then explains the Libuv thread pool through the task queue, commit task, consume task, callback processing, preparation for the next article on Libuv file processing, if you are interested in Libuv series, welcome to follow us.