.
preface
I started writing some blogs mainly to review and summarize the code after I read it, but also to refer back to it later. This series will start from the official website examples, as far as possible in a link trace way to record the source code core module implementation, this example source
- libuv – signals
Knowledge points involved
- Libuv signal implementation
- List of Unix signal signals
- The use of red-black trees in signaling
- Sigaction (2) — Linux Manual Page
- Checking if errno ! = EINTR: what does it mean?
- Pipe (2) — Linux manual Page
- Pthread_atfork (3) – Linux manual Page
- Pthread_sigmask (3) — Linux manual Page
Example signal/main. C
Signum (sigaction) {signum (sigaction) {sigaction (sigaction) {sigaction (sigaction) {sigaction (sigaction) {sigaction (sigaction) {sigaction (sigaction) {sigaction (sigaction) {sigaction (sigaction) {sigaction (sigaction);
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <uv.h>
uv_loop_t* create_loop()
{
uv_loop_t *loop = malloc(sizeof(uv_loop_t));
if (loop) {
uv_loop_init(loop);
}
return loop;
}
void signal_handler(uv_signal_t *handle, int signum)
{
printf("Signal received: %d\n", signum);
uv_signal_stop(handle);
}
// two signal handlers in one loop
void thread1_worker(void *userp)
{
uv_loop_t *loop1 = create_loop();
uv_signal_t sig1a, sig1b;
uv_signal_init(loop1, &sig1a);
uv_signal_start(&sig1a, signal_handler, SIGUSR1);
uv_signal_init(loop1, &sig1b);
uv_signal_start(&sig1b, signal_handler, SIGUSR1);
uv_run(loop1, UV_RUN_DEFAULT);
}
// two signal handlers, each in its own loop
void thread2_worker(void *userp)
{
uv_loop_t *loop2 = create_loop();
uv_loop_t *loop3 = create_loop();
uv_signal_t sig2;
uv_signal_init(loop2, &sig2);
uv_signal_start(&sig2, signal_handler, SIGUSR1);
uv_signal_t sig3;
uv_signal_init(loop3, &sig3);
uv_signal_start(&sig3, signal_handler, SIGUSR1);
while (uv_run(loop2, UV_RUN_NOWAIT) || uv_run(loop3, UV_RUN_NOWAIT)) {
}
}
int main()
{
printf("PID %d\n", getpid());
uv_thread_t thread1, thread2;
uv_thread_create(&thread1, thread1_worker, 0);
uv_thread_create(&thread2, thread2_worker, 0);
uv_thread_join(&thread1);
uv_thread_join(&thread2);
return 0;
}
Copy the code
For the SIGUSR1 signal in this example, it is user-defined signal 1
- For example, signal 10 to process 12345
$ kill -10 12345
Copy the code
- Sending custom signal 1 to process 12345 can be done with the following command
$ kill -SIGUSR1 12345
Copy the code
uv_signal_init
thread1_worker > uv_signal_init
Some data initialization is performed on loop and Handle, mainly by calling the uv__signal_loop_once_init function.
int uv_signal_init(uv_loop_t* loop, uv_signal_t* handle) {
int err;
err = uv__signal_loop_once_init(loop);
if (err)
return err;
uv__handle_init(loop, (uv_handle_t*) handle, UV_SIGNAL);
handle->signum = 0;
handle->caught_signals = 0;
handle->dispatched_signals = 0;
return 0;
}
Copy the code
uv__signal_loop_once_init
thread1_worker > uv_signal_init > uv__signal_loop_once_init
- If there is already a communicating FD, return it directly
- The uv__make_pipe function in libuv source code learning note and ipc analysis, the main function is to call pipe2 function
Pipe2: Creates a pipe, a one-way data channel that can be used for interprocess communication. The array piPEFD is used to return two file descriptors pointing to the end of the pipe. Pipefd [0] refers to the reading end of the pipe. Pipefd [1] refers to the write end to the pipe. The pipeline of data written to the write side is buffered by the kernel until the end of the pipeline is read from read.
- Call uv__io_init to initialize an I/O observer whose callback is uv__signal_event. The fd to be observed is the one that piPE2 gets to the reader. I/O related implementation can refer to the thread pool and I/O
- Call uv__io_start to register the I/O observer that you just initialized.
static int uv__signal_loop_once_init(uv_loop_t* loop) { int err; /* Return if already initialized. */ if (loop->signal_pipefd[0] ! = -1) return 0; err = uv__make_pipe(loop->signal_pipefd, UV_NONBLOCK_PIPE); if (err) return err; uv__io_init(&loop->signal_io_watcher, uv__signal_event, loop->signal_pipefd[0]); uv__io_start(loop, &loop->signal_io_watcher, POLLIN); return 0; }Copy the code
uv_signal_start
thread1_worker > uv_signal_start
Uv_signal_start function is mainly called uv_signal_start method, libuv has a large number of similar function name…
- Return if the signum of the handle is registered
- Call uv__signal_block_and_lock to lock the mutex lock
- Call uv__signal_first_handle. If signum has a listener set, uv__signal_register_handler is not called
- Call uv__signal_register_handler to register the listener function with signum
- Add the handle to the tree via RB_INSERT.
- A call to uv__signal_unlock_and_unblock to unlock the data will write the data once so that other waiting threads can run down from the uv__signal_block_and_lock function.
static int uv__signal_start(uv_signal_t* handle, uv_signal_cb signal_cb, int signum, int oneshot) { ... if (signum == handle->signum) { handle->signal_cb = signal_cb; return 0; } /* If the signal handler was already active, stop it first. */ if (handle->signum ! = 0) { uv__signal_stop(handle); } uv__signal_block_and_lock(&saved_sigmask); first_handle = uv__signal_first_handle(signum); if (first_handle == NULL || (! oneshot && (first_handle->flags & UV_SIGNAL_ONE_SHOT))) { err = uv__signal_register_handler(signum, oneshot); if (err) { /* Registering the signal handler failed. Must be an invalid signal. */ uv__signal_unlock_and_unblock(&saved_sigmask); return err; } } handle->signum = signum; if (oneshot) handle->flags |= UV_SIGNAL_ONE_SHOT; RB_INSERT(uv__signal_tree_s, &uv__signal_tree, handle); uv__signal_unlock_and_unblock(&saved_sigmask); handle->signal_cb = signal_cb; uv__handle_start(handle); return 0; }Copy the code
uv__signal_block_and_lock
thread1_worker > uv_signal_start > uv__signal_block_and_lock
- Sigfillset: This function initializes the signal set to null.
- Pthread_sigmask: This function can be used in multithreaded programs that wish to process signals only in the main thread. Each thread has its own signal masking set (signal mask), and the pthread_sigmask function can be used to mask a thread’s response to certain signals, leaving only the thread that needs to process the signal to process the specified signal.
As you can see from the pthread_sigmask example, the main operation is to initialize the signal set and then call the uv__signal_lock function.
// Pthread_sigmask sigemptySet (&set); sigaddset(&set, SIGQUIT); sigaddset(&set, SIGUSR1); s = pthread_sigmask(SIG_BLOCK, &set, NULL);Copy the code
static void uv__signal_block_and_lock(sigset_t* saved_sigmask) {
sigset_t new_mask;
if (sigfillset(&new_mask))
abort();
/* to shut up valgrind */
sigemptyset(saved_sigmask);
if (pthread_sigmask(SIG_SETMASK, &new_mask, saved_sigmask))
abort();
if (uv__signal_lock())
abort();
}
Copy the code
uv__signal_lock
thread1_worker > uv_signal_start > uv__signal_block_and_lock > uv__signal_lock
Uv__signal_lock_pipefd [0] is read with read, and a polling retry is attempted when an EINTR error occurs. An EINTR error occurs when a signal is sent while a system call is in progress.
Many system calls will report EINTR error codes if a signal occurs while the system call is in progress. No error actually occurred, but it was reported this way because the system could not recover the system call automatically. This encoding mode simply retries the system call when this happens, ignoring the interrupt.
static int uv__signal_lock(void) { int r; char data; do { r = read(uv__signal_lock_pipefd[0], &data, sizeof data); } while (r < 0 && errno == EINTR); return (r < 0) ? 1:0; }Copy the code
So when does reading data allow the program to continue?
At this point, I feel a bit broken, so from the beginning to clarify, is not ignored any details. Finally, the write data is found in the create_loop > uv_loop_init > uv__signal_global_once_init > uv__signal_global_init function.
Uv__signal_global_init analysis
If uv__signal_lock_piPEfd is not set, pthread_atfork is called
The first two arguments to pthread_atfork are before when the fork is called, after which check function will be run in the parent process, and the third argument is which check function will be run in the child process.
Uv__signal_global_reinit (uv__signal_global_reinit, uv__signal_global_reinit, uv__signal_global_reinit, uv__signal_global_reinit, uv__signal_global_reinit, uv__signal_global_reinit)
static void uv__signal_global_init(void) {
if (uv__signal_lock_pipefd[0] == -1)
// https://man7.org/linux/man-pages/man3/pthread_atfork.3.html
if (pthread_atfork(NULL, NULL, &uv__signal_global_reinit))
abort();
uv__signal_global_reinit();
}
Copy the code
uv__signal_global_reinit
create_loop > uv_loop_init > uv__signal_global_once_init > uv__signal_global_init > uv__signal_global_reinit
The main thread calls uv__signal_global_reinit once to create a communication pipe through uv__make_pipe, and finally calls uv__signal_unlock once to write data. Thus, when one thread enters the uv__signal_lock logic, it reads the data, and the program continues to run, while the other threads continue to wait, achieving the purpose of the mutex. A little confused as to why you don’t use mutex directly…
When pthread_mutex_lock() returns, the mutex is locked. A thread calls this function to lock the mutex, and if the mutex is already locked and owned by another thread, the calling thread blocks until the mutex becomes available.
static void uv__signal_global_reinit(void) { uv__signal_cleanup(); if (uv__make_pipe(uv__signal_lock_pipefd, 0)) abort(); if (uv__signal_unlock()) abort(); } static int uv__signal_unlock(void) { int r; char data = 42; do { r = write(uv__signal_lock_pipefd[1], &data, sizeof data); } while (r < 0 && errno == EINTR); return (r < 0) ? 1:0; }Copy the code
uv__signal_first_handle
thread1_worker > uv_signal_start > uv__signal_first_handle
Back to the main thread, RB_NFIND is used to check whether the signum has set a listener function. The main reason for this is that sigaction can only bind a listener function to a signum.
static uv_signal_t* uv__signal_first_handle(int signum) { /* This function must be called with the signal lock held. */ uv_signal_t lookup; uv_signal_t* handle; lookup.signum = signum; lookup.flags = 0; lookup.loop = NULL; handle = RB_NFIND(uv__signal_tree_s, &uv__signal_tree, &lookup); if (handle ! = NULL && handle->signum == signum) return handle; return NULL; }Copy the code
RB_NFIND
thread1_worker > uv_signal_start > uv__signal_first_handle > RB_NFIND
As well as the QUEUE is implemented by a set of macro definition, the code in the deps/uv/include/uv/tree. H file.
Signum is in digital form and can be searched and traversed efficiently by red-black tree structure.
uv__signal_register_handler
thread1_worker > uv_signal_start > uv__signal_register_handler
Set the signum signal processing function to uv__signal_handler
Sa_flags: Used to set other operations related to signal processing. The following values are available. Available OR (|) combination
- A_NOCLDSTOP: If signum is SIGCHLD, the parent process is not notified when the child process is paused
- SA_ONESHOT/SA_RESETHAND: Changes the signal processing mode to the preset one before calling the new signal processing function
- SA_RESTART: The system call interrupted by the signal restarts automatically
- SA_NOMASK/SA_NODEFER: Ignore the second arrival of this signal until processing has finished
static int uv__signal_register_handler(int signum, int oneshot) {
/* When this function is called, the signal lock must be held. */
struct sigaction sa;
/* XXX use a separate signal stack? */
memset(&sa, 0, sizeof(sa));
if (sigfillset(&sa.sa_mask))
abort();
sa.sa_handler = uv__signal_handler;
sa.sa_flags = SA_RESTART;
if (oneshot)
sa.sa_flags |= SA_RESETHAND;
/* XXX save old action so we can restore it later on? */
if (sigaction(signum, &sa, NULL))
return UV__ERR(errno);
return 0;
}
Copy the code
uv__signal_handler
thread1_worker > uv_signal_start > uv__signal_register_handler > uv__signal_handler
As the only signal-processing function, let’s look at the implementation of the Uv__signal_handler
- RB_NEXT iterates to fetch the handle of the previously inserted attribute signum equal to the currently received signal signum.
- Write data to the FD write end of the communication of the handle.
- In the five Poll for I/O phase of the event loop, epoll waits for the write event to succeed, notifying the I/O observer set by uv__io_init above, and calling the I/O observer callback function, The uv__signal_event function for this example.
static void uv__signal_handler(int signum) { ... for (handle = uv__signal_first_handle(signum); handle ! = NULL && handle->signum == signum; handle = RB_NEXT(uv__signal_tree_s, &uv__signal_tree, handle)) { int r; msg.signum = signum; msg.handle = handle; /* write() should be atomic for small data chunks, so the entire message * should be written at once. In theory the pipe could become full, in * which case the user is out of luck. */ do { r = write(handle->loop->signal_pipefd[1], &msg, sizeof msg); } while (r == -1 && errno == EINTR); assert(r == sizeof msg || (r == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))); if (r ! = -1) handle->caught_signals++; } uv__signal_unlock(); errno = saved_errno; }Copy the code
uv__signal_event
thread1_worker > uv_signal_init > uv__signal_loop_once_init > uv__signal_event
Signal I/O setup callback function.
- Loop through all written messages, which can be multiple messages.
- If the signum of the message needs to be listened on, the handle-> Signal_cb callback is called.
static void uv__signal_event(uv_loop_t* loop, uv__io_t* w, unsigned int events) { uv__signal_msg_t* msg; uv_signal_t* handle; char buf[sizeof(uv__signal_msg_t) * 32]; size_t bytes, end, i; int r; bytes = 0; end = 0; do { r = read(loop->signal_pipefd[0], buf + bytes, sizeof(buf) - bytes); if (r == -1 && errno == EINTR) continue; if (r == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { /* If there are bytes in the buffer already (which really is extremely * unlikely if possible at all) we can't exit the function here. We'll * spin until more bytes are read instead. */ if (bytes > 0) continue; /* Otherwise, there was nothing there. */ return; } /* Other errors really should never happen. */ if (r == -1) abort(); bytes += r; /* `end` is rounded down to a multiple of sizeof(uv__signal_msg_t). */ end = (bytes / sizeof(uv__signal_msg_t)) * sizeof(uv__signal_msg_t); for (i = 0; i < end; i += sizeof(uv__signal_msg_t)) { msg = (uv__signal_msg_t*) (buf + i); handle = msg->handle; if (msg->signum == handle->signum) { assert(! (handle->flags & UV_HANDLE_CLOSING)); handle->signal_cb(handle, handle->signum); } handle->dispatched_signals++; if (handle->flags & UV_SIGNAL_ONE_SHOT) uv__signal_stop(handle); } bytes -= end; /* If there are any "partial" messages left, move them to the start of the * the buffer, and spin. This should not happen. */ if (bytes) { memmove(buf, buf + end, bytes); continue; } } while (end == sizeof buf); }Copy the code
summary
Simply register a signal_start function on the first call to uv__signal_start. When a signal is received, the function iterates through all the handles in the red-black tree that care about this signum. Then, data is written to the write end of the communication FD applied by the Handle through PIPE2. In the event cycle stage, epoll captures and notifies the I/O observer of the handle. Finally, the callback of the observer is called to notify all listener functions.
Read more: github.com/xiaoxiaojx/… If you are interested, you can click Star to support it. Thank you for reading