The schematic diagram of this section is as follows.



The last article didn’t cover anything, but Node itself is so complex that you have to go through the whole process. I read 300 lines of the Webpack source code to get the main property in package.json. Looking back at my previous blog post, the synchronous part made a lot of sense, but the asynchronous part sucked. However, to be honest, asynchronous locks, the underlying operating system API(IOCP) part I do not understand up to now, after all, there is no actual multi-threaded development experience, just a pure technical enthusiast.

This article enters libuv again, starting from uv_fs_stat. The operating system is Windows. The source code is as follows.

The parameters are the event polling object loop, the management event processing object req, the path, and the event callback cb
int uv_fs_stat(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb) {
  int err;

  INIT(UV_FS_STAT);
  err = fs__capture_path(req, path, NULL, cb ! =NULL);
  if (err) {
    return uv_translate_sys_error(err);
  }

  POST;
}Copy the code
In fact, the Unix version of the code is much cleaner, straight up

int uv_fs_stat(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb) {
  INIT(STAT);
  PATH;
  POST;
}Copy the code
It’s not a problem. It’s all three steps.

The first two steps were introduced in that article and will not be repeated here. Basically, initialize the REQ object based on the operation type, then manipulate the path, allocate reasonable space to the path string and so on.

Again, focus on the POST macro.

#define POST                                                                  \
  do {                                                                        \
    if(cb ! = NULL) { \ uv__req_register(loop, req); \// word_req is a structure of type uv__work
      // UV__WORK_FAST_IO is the I/O operation type
      // uv__fs_work is a function
      // uv__fs_done is also a function
      uv__work_submit(loop,                                                   \
                      &req->work_req,                                         \
                      UV__WORK_FAST_IO,                                       \
                      uv__fs_work,                                            \
                      uv__fs_done);                                           \
      return 0; The \}else {                                                                  \
      uv__fs_work(&req->work_req);                                            \
      returnreq->result; \} \} \while (0)Copy the code
Since you only care about asynchronous operations, look at the if branch. The parameters are given in the comments. Another point to note is the method names: register, submit. That is, in asynchronous operations, this is also not the place to do I/O, and there is actually a deeper place to do it, so keep going.

Uv__req_register is to add the loop’s active_handle++. After each round of polling, the loop will check whether there are still active handles to be processed, and continue running if there are. The judgment criterion is whether the number of Active_handle is greater than 0.

Go to the next step uv__work_submit.

Struct uv__work {void (*work)(struct uv__work *w); void (*done)(struct uv__work *w, int status); struct uv_loop_s* loop; void* wq[2]; }; Void uv__work_submit(uv_loop_t* loop, struct uv__work* w, enum uv__work_kind, struct uv__work* w, struct uv__work* w, enum uv__work_kind, void (*work)(struct uv__work* w), void (*done)(struct uv__work* w, int status)) { uv_once(&once, init_once); w->loop = loop; w->work = work; w->done = done; post(&w->wq, kind); }Copy the code
The first uv_once, as its name suggests, executes only once, then attaches the loop object and two methods to the uv__work structure of the preceding req, and finally calls POST.

The uv_once method is interesting. It has nothing to do with the stat operation itself, but is simply a preparation for all I/O operations, and all I/O operations will pre-tune this method. Windows, Unix system processing is completely different, here stick a code, Unix do not want to see also do not understand, make Windows system.

Void uv_once(uv_once_t* guard, void (*callback)(void)) {if (guard->ran) {return; } uv__once_inner(guard, callback); } static void uv__once_inner(uv_once_t* guard, void (*callback)(void)) { DWORD result; HANDLE existing_event, created_event; // Create or open a named or unnamed event object created_event = CreateEvent(NULL, 1, 0, NULL); if (created_event == 0) { uv_fatal_error(GetLastError(), "CreateEvent"); } // If &guard->event is equal to NULL, created_event is given &guard->event // returns the initial value of the first argument existing_event = InterlockedCompareExchangePointer(&guard->event, created_event, NULL); If (existing_event == NULL) {/* We won the race */ callback(); result = SetEvent(created_event); assert(result); guard->ran = 1; } else { // ... }}Copy the code
Let’s explain the above function in pieces.

  • Libuv communicates directly with the operating system, and on Windows needs its own Event module to assist in asynchronous operations.
  • In advance, all I/O operations are handled by a separate thread. Therefore, uv_once can be called multiple times. The first is the easiest, the first runner has run all the way through, ran is set to 1, and the rest of the threads are blocked by uv_once and returned. The second one is more complicated, where both threads receive the task at the same time and both run into uv_once_inner. How to ensure that the callback is called only once? Here using the Windows built-in InterlockedCompareExchangePointer atomic pointer comparison method. What is atomic comparison? This is a concept that can only be found in multithreading. Atomicity ensures that the value of each variable read is calculated according to the latest information, avoiding the race problem that often occurs in multithreading.
  • Only the first thread to call will enter the if branch, then call the callback method and set the event.
Finally, all the code flows to execute the callback, which is a function pointer with no return value and no arguments called init_once.

static void init_once(void) {
#ifndef _WIN32
  // Use a 32-bit OS to buy a new computer
  / / a little...
#endif
  init_threads();
}Copy the code
Interesting. Here comes the thread.

First, libuv has one key data structure: queues, in SRC /queue.h. Most of the time (for example, when I talked about callback in a certain stage of polling) I used linked lists, but I actually used this, because linked lists are supersets of queues and are easy to understand and generally not wrong. But the reason for that is that there are a lot of queue macros that are used to initialize thread pools, and I don’t want to go into that, but I’ll do a separate article on that.

So here’s the code.

static void init_threads(void) {
  unsigned int i;
  const char* val;
  uv_sem_t sem;

  // The default thread pool size is 4
  nthreads = ARRAY_SIZE(default_threads);
  // This can be set manually using the environment variable UV_THREADPOOL_SIZE
  val = getenv("UV_THREADPOOL_SIZE");
  // If set to 0, it becomes 1
  if(val ! =NULL)
    nthreads = atoi(val);
  if (nthreads == 0)
    nthreads = 1;
  if (nthreads > MAX_THREADPOOL_SIZE)
    nthreads = MAX_THREADPOOL_SIZE;

  threads = default_threads;
  // Allocate the space static variable threads is responsible for managing threads
  if (nthreads > ARRAY_SIZE(default_threads)) {
    threads = uv__malloc(nthreads * sizeof(threads[0]));
    if (threads == NULL) { nthreads = ARRAY_SIZE(default_threads); threads = default_threads; }}// Here is the lock and QUEUE related...

  // Set the thread to wake up and execute the worker method directly
  for (i = 0; i < nthreads; i++)
    if (uv_thread_create(threads + i, worker, &sem))
      abort(a);// Unconnected code...
}Copy the code
Aside from some code that you don’t care about, all that’s left is to determine if you manually set the number of thread pools, initialize the allocation, and loop through each thread to assign tasks.

The worker can have a brief look first. Most of the content is queue-related, and all the details are written in the comments.

static void worker(void* arg) {
  // ...

  // This is used to lock code blocks in many places
  uv_mutex_lock(&mutex);
  for (;;) {
    / /.. .

    // Fetch a node from the queue
    q = QUEUE_HEAD(&wq);
    // Exit without more information to process
    2. Wake up another thread to process the method again (it may come alive in the next moment) 2. Remove the lock
    if (q == &exit_message) {
      uv_cond_signal(&cond);
      uv_mutex_unlock(&mutex);
      break;
    }

    // Remove the node from the queue
    QUEUE_REMOVE(q);
    QUEUE_INIT(q);

    is_slow_work = 0;
    // Node will not go through the fast channel
    if (q == &run_slow_work_message) {
      / /...
    }

    // Since the node has been removed from the queue, the lock can be removed
    uv_mutex_unlock(&mutex);

    // Perform work, the actual I/O operations (e.g. Fs.stat...) Refer to the uv__work_submit method above
    w = QUEUE_DATA(q, struct uv__work, wq);
    w->work(w);

    After executing the node task, add the result to the queue of word_queue
    uv_mutex_lock(&w->loop->wq_mutex);
    w->work = NULL;
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
    uv_async_send(&w->loop->wq_async);
    uv_mutex_unlock(&w->loop->wq_mutex);

    // For (;;) The lock is purely to prepare the loop for the next time
    uv_mutex_lock(&mutex);
    if (is_slow_work) {
      /* `slow_io_work_running` is protected by `mutex`. */slow_io_work_running--; }}}Copy the code
Note that the method is static, so you also need to deal with multithreading. Notes I wrote very detailed, you can slowly see, do not understand C++ can probably understand the process.

I thought I could finish this one, but it’s a bit of a long process, so let’s do that.