.

preface

I started writing some blogs mainly to review and summarize the code after I read it, but also to refer back to it later. This series will start from the official website examples, as far as possible in a link trace way to record the source code core module implementation, this example source

  • libuv – filesystem

Knowledge points involved

  • Implementation of thread pool in Libuv
  • Libuv asynchronous I/O implementation
  • Thread communication in libuv
  • Thread locks, conditional variable semaphore use in Libuv
  • The application of epoll in Libuv
  • Application of EventFD in Libuv
  • Async NPM package
  • Assembler Instructions with C Expression Operands

Example uvcat/main. C

An example of how a time-consuming fs_open task can be implemented asynchronously.

int main(int argc, char **argv) {
    uv_fs_open(uv_default_loop(), &open_req, argv[1], O_RDONLY, 0, on_open);
    uv_run(uv_default_loop(), UV_RUN_DEFAULT);

    uv_fs_req_cleanup(&open_req);
    uv_fs_req_cleanup(&read_req);
    uv_fs_req_cleanup(&write_req);
    return 0;
}
Copy the code

uv_fs_open

main > uv_fs_open

Start registering tasks with the event loop uv_default_loop.

int uv_fs_open(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, int mode, Uv_fs_cb cb) {// 🔥 Perform some data initialization operations INIT(OPEN); // 🔥 checks PATH for the passed file PATH parameter; req->flags = flags; req->mode = mode; POST; }Copy the code

INIT

main > uv_fs_open > INIT

The cb property of uv_fs_t* req is mounted to the cb property of uv_fs_t* req.

#define INIT(subtype)                                                         \
  do {                                                                        \
    if (req == NULL)                                                          \
      return UV_EINVAL;                                                       \
    UV_REQ_INIT(req, UV_FS);                                                  \
    req->fs_type = UV_FS_ ## subtype;                                         \
    req->result = 0;                                                          \
    req->ptr = NULL;                                                          \
    req->loop = loop;                                                         \
    req->path = NULL;                                                         \
    req->new_path = NULL;                                                     \
    req->bufs = NULL;                                                         \
    req->cb = cb;                                                             \
  }                                                                           \
  while (0)
Copy the code

POST

main > uv_fs_open > POST

The uv__work_submit function is called to submit a time-consuming task to the thread pool for completion.

#define POST \ do { \ if (cb ! = NULL) { \ // (loop)->active_reqs.count++; uv__req_register(loop, req); \ uv__work_submit(loop, \ &req->work_req, \ UV__WORK_FAST_IO, \ // 🔥 call open to open a file uv__fs_work, \ // 🔥 calls on_open uv__fs_done passed in the uv_fs_open function argument); \ return 0; \ } \ else { \ uv__fs_work(&req->work_req); \ return req->result; \ } \ } \ while (0)Copy the code

uv_queue_work

For any kind of time-consuming task, Libuv also provides another method called work queue, which can easily submit any number of time-consuming tasks to a thread pool.

The implementation is similar to the POST method above, with a call to uv__work_submit to submit a task, except that instead of uv__fs_work, the task can be any task passed in by the user. Here is an example of uv_queue_work

You can see that through a for loop, several AFTER_FIB tasks are easily submitted to the thread pool via uv_queue_work

// uv_queue_work int main() {loop = uv_default_loop(); int data[FIB_UNTIL]; uv_work_t req[FIB_UNTIL]; int i; for (i = 0; i < FIB_UNTIL; i++) { data[i] = i; req[i].data = (void *) &data[i]; uv_queue_work(loop, &req[i], fib, after_fib); } return uv_run(loop, UV_RUN_DEFAULT); }Copy the code

Uv_queue_work looks very similar to the ASYNc NPM package, except that async commits tasks to the thread and uv_queue_work commits tasks to the thread pool.

Parallel ([function(callback) {...}, function(callback) {...}], function(err, results) { // optional callback });Copy the code

uv__work_submit

main > uv_fs_open > POST > uv__work_submit

Struct uv__work* w (struct uv__work* w, struct uv__work* W);

Struct uv__work* w mount uv__fs_work (struct uv__work* w)

At this point, we can look at the called init_once function and see that it mainly calls init_threads

void uv__work_submit(uv_loop_t* loop, struct uv__work* w, enum uv__work_kind kind, void (*work)(struct uv__work* w), void (*done)(struct uv__work* w, int status)) { uv_once(&once, init_once); w->loop = loop; w->work = work; w->done = done; // 🔥 call uv_cond_signal ~ post(&w->wq, kind); }Copy the code

Init_threads – Thread pool

main > uv_fs_open > POST > uv__work_submit > init_once > init_threads

The thread pool initialization, since the code is longer, in addition to the part of the code is not very important, the corresponding source/deps/uv/SRC/threadpool. C.

The following will talk about the uv_cond and uv_sem that are called during initialization. Finally, the work of a thread is the worker function.

static void init_threads(void) { ... // 🔥 condition variable if (uv_cond_init(&cond)) abort(); If (uv_mutex_init(&mutex)) abort(); // 🔥 lock: a lock added by a thread must be unlocked by that thread. QUEUE_INIT(&wq); QUEUE_INIT(&slow_io_pending_wq); QUEUE_INIT(&run_slow_work_message); // 🔥 semaphore if (uv_sem_init(&sem, 0)) abort(); for (i = 0; i < nthreads; i++) if (uv_thread_create(threads + i, worker, &sem)) abort(); for (i = 0; i < nthreads; i++) uv_sem_wait(&sem); uv_sem_destroy(&sem); }Copy the code

Uv_cond – Condition variable

main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > uv_cond

In this case, threads in the thread pool will wait until the task queue is empty and then call pthread_cond_signal to wake up a thread to work

Pthread_mutex_lock (&mutex); pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); Pthread_mutex_lock (&mutex); pthread_cond_wait(&cond, &mutex); pthread_mutex_unlock(&mutex);Copy the code

Uv_sem – Semaphore

main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > uv_sem

Uv_sem_wait also causes a thread to fall asleep. Uv_sem_post is called once by another thread. In this case, uv_sem_wait means that the code will not continue until all workers created by uv_thread_create have succeeded. Uv_sem_post is called once in each worker function.

Ensure that the init_threads function does not finish running until the thread pool is created.

It is similar to uv_cond. For example, if uv_cond is used to count every worker created, then pthread_cond_signal is called once in the worker of the last thread created.

  • Uv_sem_wait – called in the init_threads function
  • Uv_sem_post – called in the worker function

worker

main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > worker

When uv__work_submit submits a function that calls pthread_cond_signal, a thread starts working

static void worker(void* arg) { ... Uv_sem_post ((uv_sem_t*) arg); arg = NULL; uv_mutex_lock(&mutex); for (;;) { /* `mutex` should always be locked at this point. */ /* Keep waiting while either no work is present or only slow I/O and we're at the threshold for that. */ while (QUEUE_EMPTY(&wq) || (QUEUE_HEAD(&wq) == &run_slow_work_message && QUEUE_NEXT(&run_slow_work_message) == &wq && slow_io_work_running >= slow_work_thread_threshold())) { idle_threads += 1;  Uv_cond_wait (&cond, &mutex); uv_cond_wait(&cond, &mutex); idle_threads -= 1; }... uv_mutex_unlock(&mutex); w = QUEUE_DATA(q, struct uv__work, wq); // 🔥 start work w->work(w); uv_mutex_lock(&w->loop->wq_mutex); w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); // 🔥 uv__fs_work starts notifying the corresponding fd uv_async_send(&w->loop->wq_async); . }}Copy the code

uv_async_send

main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > worker > uv_async_send

When the thread’s time-consuming w->work task (uv__fs_work as set in this example’s uv__work_submit) completes, it is time to notify the main thread

Uv_async_send checks whether there are other threads working on the current task. If there are other threads calling uv__async_send to send messages, the task will be skipped, and one thread will notify the main thread

The implementation of the cMPxchgi function called in this function is what.

int uv_async_send(uv_async_t* handle) { /* Do a cheap read first. */ if (ACCESS_ONCE(int, handle->pending) ! = 0) return 0; /* Tell the other thread we're busy with the handle. */ if (cmpxchgi(&handle->pending, 0, 1) ! = 0) return 0; /* Wake up the other thread's event loop. */ uv__async_send(handle->loop); /* Tell the other thread we're done. */ if (cmpxchgi(&handle->pending, 1, 2) ! = 1) abort(); return 0; }Copy the code

cmpxchgi

main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > worker > uv_async_send > cmpxchgi

It doesn’t matter if you don’t understand inline assembly statements, we can look at the equivalent __sync_val_compare_and_swap function in action

UV_UNUSED(static int cmpxchgi(int* ptr, int oldval, int newval)) {
#if defined(__i386__) || defined(__x86_64__)
  int out;
  __asm__ __volatile__ ("lock; cmpxchg %2, %1;"
                        : "=a" (out), "+m" (*(volatile int*) ptr)
                        : "r" (newval), "0" (oldval)
                        : "memory");
  return out;
#elif defined(__MVS__)
  unsigned int op4;
  if (__plo_CSST(ptr, (unsigned int*) &oldval, newval,
                (unsigned int*) ptr, *ptr, &op4))
    return oldval;
  else
    return op4;
#elif defined(__SUNPRO_C) || defined(__SUNPRO_CC)
  return atomic_cas_uint((uint_t *)ptr, (uint_t)oldval, (uint_t)newval);
#else
  return __sync_val_compare_and_swap(ptr, oldval, newval);
#endif
}
Copy the code

__sync_val_compare_and_swap

main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > worker > uv_async_send > cmpxchgi > __sync_val_compare_and_swap

Provides atomic comparisons and swaps, and writes newval to * PTR if * PTR == oldval. After reading the explanation, I can understand a little bit, like assignment, so back to the main line and look at the uv__async_send function

type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
Copy the code

uv__async_send

main > uv_fs_open > POST > uv__work_submit > init_once > init_threads > worker > uv_async_send > uv__async_send

Loop ->async_io_watcher. Fd () {epoll_ctl () {epoll () {epoll_ctl () {epoll_ctl ();

Before we get to epoll, when was our I/O observer registered?

static void uv__async_send(uv_loop_t* loop) { const void* buf; ssize_t len; int fd; int r; buf = ""; len = 1; fd = loop->async_wfd; #if defined(__linux__) if (fd == -1) { static const uint64_t val = 1; buf = &val; len = sizeof(val); Fd = loop->async_io_watcher.fd; R = write(fd, buf, len); while (r == -1 && errno == EINTR); if (r == len) return; if (r == -1) if (errno == EAGAIN || errno == EWOULDBLOCK) return; abort(); }Copy the code

Uv__io_start – Registers I/O observers

main > uv_default_loop > uv_loop_init > uv_async_init > uv__async_start > uv__io_start

Call the uv__io_start function to register an I/O observer. When is the FD that the I/O observer needs to observe set?

void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) { ... if (QUEUE_EMPTY(&w->watcher_queue)) QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue); if (loop->watchers[w->fd] == NULL) { loop->watchers[w->fd] = w; loop->nfds++; }}Copy the code

Uv__io_init – Initializes the I/O observer

main > uv_default_loop > uv_loop_init > uv_async_init > uv__async_start > uv__io_init

Note that the purpose of this example is that the main thread wants to know when the child thread has completed its task, so we need to get a thread communication FD to observe it, ok

You can see that in the uv__async_start function the thread communication fd is retrieved via eventfd and then mounted to the observer by calling uv__io_init. The observer’s callback is set to uv__async_io.

Now that we have successfully initialized an I/O observer, we wait for epoll to catch the callback that calls the observer Settings when data is written.

static int uv__async_start(uv_loop_t* loop) {
 ...
#ifdef __linux__
  err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
  if (err < 0)
    return UV__ERR(errno);

  pipefd[0] = err;
  pipefd[1] = -1;
#else
  err = uv__make_pipe(pipefd, UV_NONBLOCK_PIPE);
  if (err < 0)
    return err;
#endif

  uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
  uv__io_start(loop, &loop->async_io_watcher, POLLIN);
  loop->async_wfd = pipefd[1];

  return 0;
}
Copy the code

Uv__async_io – Asynchronous I/O observer callback

Uv__async_io first reads the data written by other threads, and finally calls h->async_cb.

What about the h->async_cb function?

static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { char buf[1024]; ssize_t r; QUEUE queue; QUEUE* q; uv_async_t* h; assert(w == &loop->async_io_watcher); for (;;) { r = read(w->fd, buf, sizeof(buf)); if (r == sizeof(buf)) continue; if (r ! = -1) break; if (errno == EAGAIN || errno == EWOULDBLOCK) break; if (errno == EINTR) continue; abort(); } QUEUE_MOVE(&loop->async_handles, &queue); while (! QUEUE_EMPTY(&queue)) { q = QUEUE_HEAD(&queue); h = QUEUE_DATA(q, uv_async_t, queue); QUEUE_REMOVE(q); QUEUE_INSERT_TAIL(&loop->async_handles, q); if (0 == uv__async_spin(h)) continue; /* Not pending. */ if (h->async_cb == NULL) continue; h->async_cb(h); }}Copy the code

uv_async_init

main > uv_default_loop > uv_loop_init > uv_async_init

H ->async_cb is found to be set in uv_async_init, and the third async_cb parameter can be found by looking up where uv_async_init is called.

int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
  int err;

  err = uv__async_start(loop);
  if (err)
    return err;

  uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
  handle->async_cb = async_cb;
  handle->pending = 0;

  QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
  uv__handle_start(handle);

  return 0;
}
Copy the code

uv_loop_init

main > uv_default_loop > uv_loop_init

Async_io h->async_cb(h) uv__work_done uv_loop_init

int uv_loop_init(uv_loop_t* loop) { .... err = uv_async_init(loop, &loop->wq_async, uv__work_done); . }Copy the code

uv__work_done

W ->done(w, err); uv__work_submit (uv__work_submit);

void uv__work_done(uv_async_t* handle) {
  ...

  while (!QUEUE_EMPTY(&wq)) {
    q = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(q);

    w = container_of(q, struct uv__work, wq);
    err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
    w->done(w, err);
  }
}
Copy the code

uv__fs_done

The uv__fs_done function mainly calls req->cb(req), which is the on_open function set in INIT.

static void uv__fs_done(struct uv__work* w, int status) {
  uv_fs_t* req;

  req = container_of(w, uv_fs_t, work_req);
  uv__req_unregister(req->loop, req);

  if (status == UV_ECANCELED) {
    assert(req->result == 0);
    req->result = UV_ECANCELED;
  }

  req->cb(req);
}
Copy the code

Epoll concept –

Epoll is a poll modified by the Linux kernel for handling large numbers of file descriptors. It is an enhanced version of the SELECT/Poll multiplex IO interface under Linux, which can significantly improve the system CPU utilization of programs with only a small amount of activity in a large number of concurrent connections. Another reason is that when it gets an event, it doesn’t need to iterate through the entire set of listened descriptors, just the set of descriptors that have been asynchronously woken up by kernel IO events and added to the Ready queue. In addition to providing Level Triggered I/O events like SELECT /poll, epoll also provides Edge Triggered I/O events, which makes it possible for user-space programs to cache I/O state. Reduce epoll_WAIT /epoll_pwait calls to improve application efficiency.

int epoll_create(int size);  
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
Copy the code
  1. Epoll_create – Creates an epoll instance in the kernel and returns an epoll file descriptor. In the original implementation, the caller used the size parameter to tell the kernel how many file descriptors to listen on. If the number of file descriptors listening exceeds size, the kernel automatically expands. Size no longer has such semantics, but the caller must still call size greater than 0 to ensure backward compatibility.
  2. Epoll_ctl – Adds, modifies, or deletes listening for events on fd to the kernel epoll instance corresponding to EPFD. Op can add new events for EPOLL_CTL_ADD, EPOLL_CTL_MOD, and EPOLL_CTL_DEL respectively, modify the event type listened on the file descriptor, and remove an event from the instance. If the Events property of the event sets the EPOLLET Flag, then the event is listened for by edge firing.
  3. Epoll_wait – When timeout is 0, epoll_wait always returns immediately. With timeout -1, epoll_wait blocks until any registered event is ready. When timeout is a positive integer, epoll blocks until the timing timeout milliseconds expires or the registered event becomes ready. Because of kernel scheduling delays, the blocking time may be slightly longer than Timeout milliseconds.

Epoll_create – Creates an epoll object

main > uv_default_loop > uv_loop_init > uv__platform_loop_init > epoll_create

As shown in the figure below, when a process calls the epoll_CREATE method, the kernel creates an EventPoll object (that is, the object represented by EPFD in the program). The EventPoll object is also a member of the file system and, like sockets, has a wait queue.

You can see that libuv is created in the uv__platform_loop_init function.

int uv__platform_loop_init(uv_loop_t* loop) { int fd; // 🔥 create epoll_create fd fd = epoll_create1(O_CLOEXEC); /* epoll_create1() can fail either because it's not implemented (old kernel) * or because it doesn't understand the O_CLOEXEC flag. */ if (fd == -1 && (errno == ENOSYS || errno == EINVAL)) { fd = epoll_create(256); if (fd ! = -1) uv__cloexec(fd, 1); } loop->backend_fd = fd; loop->inotify_fd = -1; loop->inotify_watchers = NULL; if (fd == -1) return UV__ERR(errno); return 0; }Copy the code

Epoll_ctl – Maintains a monitor list

main > uv_run > uv__io_poll >

After the epoll object is created, you can use epoll_ctl to add or remove the socket you want to listen on. Take adding a socket as an example. In the following figure, if sock1, SOck2, and SOck3 are monitored using epoll_ctl, eventPoll is added to the wait queue of these three sockets.

Libuv is called in the uv__io_poll function.

Uv__io_poll: function is the previous section Event cycle A very important phase in the event cycle.

void uv__io_poll(uv_loop_t* loop, int timeout) { ... while (! QUEUE_EMPTY(&loop->watcher_queue)) { ... // 🔥 epoll_ctl - event register function to register new fd into epool object space of EPFD, If (epoll_ctl(loop->backend_fd, op, w->fd, &e)) {if (errno! = EEXIST) abort(); assert(op == EPOLL_CTL_ADD); /* We've reactivated a file descriptor that's been watched before. */ if (epoll_ctl(loop->backend_fd, EPOLL_CTL_MOD, w->fd, &e)) abort(); } w->events = w->pevents; }... }Copy the code

epoll_wait

main > uv_run > uv__io_poll >

When the socket receives data, the interrupt routine adds a socket reference to EventPoll’s ready list. As shown in the figure below, after SOck2 and SOck3 receive data, the interrupt program tells RDList to reference the two sockets.

When the program executes to epoll_wait, epoll_wait returns if rdList already references the socket, or blocks the process if rdlist is empty.

This is also the amount of time in the event loop in the previous section that, if there were no other phases with no tasks, would block the process until a timer had expired.

Libuv is called in the uv__io_poll function.

void uv__io_poll(uv_loop_t* loop, int timeout) { ... for (;;) { // if (no_epoll_wait ! = 0 || (sigmask ! = 0 && no_epoll_pwait == 0)) {// 🔥 epoll_wait - blocks until any registered event is ready NFDS = epoll_pwait(loop->backend_fd, events, ARRAY_SIZE(events), timeout, &sigset); if (nfds == -1 && errno == ENOSYS) { uv__store_relaxed(&no_epoll_pwait_cached, 1); no_epoll_pwait = 1; }} else {// 🔥 epoll_wait - block until any registered event is ready NFDS = epoll_wait(loop->backend_fd, events, ARRAY_SIZE(events), timeout); if (nfds == -1 && errno == ENOSYS) { uv__store_relaxed(&no_epoll_wait_cached, 1); no_epoll_wait = 1; }}... // 🔥 iterates through the set of descriptors that are asynchronously awakened by kernel IO events and added to the Ready queue for (I = 0; i < nfds; i++) { pe = events + i; fd = pe->data.fd; . if (pe->events ! = 0) { /* Run signal watchers last. This also affects child process watchers * because those are implemented in terms of  signal watchers. */ if (w == &loop->signal_io_watcher) { have_signals = 1; } else { uv__metrics_update_idle_time(loop); // 🔥 call the observer register callback w->cb(loop, w, PE ->events); } nevents++; }}... }Copy the code

uv_fs_req_cleanup

main > uv_fs_req_cleanup

When the uv_fs_open callback on_open is called, the uv_run function in this example ends, and the code starts to run uv_fs_req_cleanup for garbage collection, and the program exits successfully.

summary

Q: In this example, how can a time-consuming fs_open task be implemented asynchronously?

A: The program initializes the thread pool first, and the threads fall asleep when the task queue is empty. When the uv_fs_open method is called to submit a fs_open task, the pthread_cond_signal is used to wake up a thread, wait for the end of the fs_open function, and then notify the main thread.

Q: So how do you notify the main thread

A: The time-consuming task was originally assigned to another thread. The main thread was notified that it first acquired A THREAD’s communication FD via EventFD, and then registered an I/O observer via epoll mechanism. When the other thread completed the task, it wrote data to the FD. It would be nice if epoll caught the callback function that the calling main thread had already set up.

Original link: github.com/xiaoxiaojx/… If you are interested, you can click Star to support it. Thank you for reading