Libevent implements multithreading
LibEvent code reading – interthread communication, signal processing \
\
Libevent isn’t thread-safe, but that doesn’t mean libevent doesn’t support multithreading.
A few days ago, RUanyf posted a micro blog about the concurrency model of Apache and Nginx
So I looked it up again and summarized the concurrency model of the network library or network server THAT I had learned
1, muduo: Netty uses one loop per thread to register listen events to the child thread Reactor (Reactor) by round-robin. Netty uses one loop per thread to register listen events to the child thread Reactor (Reactor) by round-robin.
In addition, Muduo mentioned a runInLoop() function: if the user calls functor in the current thread, the callback will be synchronized, and if it is called in another thread, the IO thread will wake up to execute the functor. How is this cross-thread invocation implemented? Other threads are likely to block the Reactor. The traditional method is PIPE (memcache multithreading), or eventFde (2) in Muduo, which puts the callback into the thread’s task queue and sends a message of a uint64 size to wake up the Reactor.
2, nginx: The working mode of master + worker. Ruanyf’s microblog says that the master accepts the connection and assigns it to the worker. In fact, this is not the case. Workers use accept_mutex to determine whether to add a listening socket to the loop. One loop per process
Libevent: It is not thread safe, but it does not support multithreading. The network part of memcache uses libevent, and there is a classic diagram describing its multithreaded implementation: \
This message notification + synchronization layer mechanism is implemented through PIPE and a locked task queue (CQ). Similar to Muduo’s EventFD. \
\
A brief summary of recent code learning: \
The most commonly used model for non-blocking IO and multiIO reuse in high performance network programs is often referred to as the Reactor pattern. There is another mode called Proactor. At present, most implementations are based on Reactor model.
In Reactor’s model, the basic structure is an event loop that implements business logic in an event-driven and event-callback manner. The events can include read and write sockets, and the establishment of connections can be carried out in a non-blocking way, which can improve the concurrency and throughput, and is suitable for IO intensive applications. But the way the callback is done also leads to fragmentation of the business logic. The Reactor pattern is typically a combination of non-blocking IO and single-threaded loops called non-blocking
IO + One loop per thread mode.
The loop refers to the thread’s main loop, and usually registers a Timer or IO to the thread’s loop. In the case of high real-time requirements, a single thread can be used to complete the connection operation, and the task of data processing can be allocated to a thread pool. It implements a 1+N approach. Non-blocking IO and timeouts enable the loop to execute.
BlockingQueued is supported in object-oriented programming (JAVA and C++) to throw data from one thread to another, and Mqueue is supported in C. Mqueue is described in UNP2.
Both sockets and pipes in Linux are FD-based operations, so both support select, poll, and so on. Implementations in frameworks that do better in terms of performance take the following approach: multiple single-threaded processes (nginx supports multithreading in terms of interfaces, using the main process and multiple worker processes) or running multithreaded processes. Typically, processes that invoke fork do not use multithreading. The purpose of calling fork is: 1. Execute other business processes. 2. The parent and child processes communicate with each other by sharing file descriptors to complete tasks together.
\
I briefly read the sample of Libevent. The basic implementation process is as follows:
First we allocate a base to which all subsequent operations will be associated:
base = event_base_new();
Then set up a series of event control blocks, in effect completing the corresponding event handler registration process.
signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);
event_add(signal_event, NULL);
Finally enter the loop, waiting for the generation and execution of the corresponding event:
event_base_dispatch(base);
Then, the basic framework and logic of Libevent will be analyzed in detail.
\
Pipe is used to implement notifications between different threads in Libevent:
The specific implementation is in the evthread_make_base_notifiable() function. Socketpair is used, which creates a pair of socket pipes to implement notifications between different threads.
evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0,
base->th_notify_fd);
First, we define a notification function and a callback function. The notification function is used by other threads to notify the thread where libevent is located. The callback function is used to respond to the thread in which libevent is located.
/* A callback function that receives a notification from another thread */
void (*cb)(evutil_socket_t, short, void *) = evthread_notify_drain_default;
/* Sets the notification function to notify */ in other threads
int (*notify)(struct event_base *) = evthread_notify_base_default;
A notify function is created in Event_Base, and the actual execution function is evthread_notify_base_default(). Send data to PIPE [0] through socket pipe[1].
base->th_notify_fn = notify;
static int
evthread_notify_base_default(struct event_base *base)
{
char buf[1];
int r;
buf[0] = (char) 0;
r = write(base->th_notify_fd[1], buf, 1);
return (r < 0 && errno ! = EAGAIN) ? 1:0;
}
Therefore, the thread on which Libevent resides needs to listen for the receiving event of Pipe[0]. Therefore, it needs to listen for pipe[0] and respond to the event with a callback function that actually reads what was sent. Added a handle based on the pipe[0] read event in Libevent.
/* Pipe [0] is set to read data, that is, get notifications, so pipe[1] calls */ in another thread
event_assign(&base->th_notify, base, base->th_notify_fd[0],
EV_READ|EV_PERSIST, cb, base);
Where cb is the defined callback function:
static void
evthread_notify_drain_default(evutil_socket_t fd, short what, void *arg)
{
unsigned char buf[1024];
struct event_base *base = arg;
while (read(fd, (char*)buf, sizeof(buf)) > 0)
;
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
base->is_notify_pending = 0;
EVBASE_RELEASE_LOCK(base, th_base_lock);
}
Therefore, pipe[0,1] is a good implementation of cross-thread event notification, and Event_base is a good wrapper for notification functions between different threads. Of course, both use non-blocking IO.
\
Signal is implemented by:
In IO multiplexing model, LoopEvent implementation is often adopted, and signal implementation should also be considered in the process of initializing IO multiplexing model. In Linux, the normal operation of signal and program is actually two different control flows. In order to communicate between the two flows, Cross-thread notification is used in LibEvent, and socketPair is also used. The specific implementation is as follows:
Initialize signal EVsig_init (base) in Select initialization function;
static void *
select_init(struct event_base *base)
{
struct selectop *sop;
.
evsig_init(base);
.
return (sop);
}
\
The implementation of evsig_init is as follows: \
int
evsig_init(struct event_base *base)
{
/* This signal is implemented using sockets */
evutil_socketpair( AF_UNIX, SOCK_STREAM, 0, base->sig.ev_signal_pair);
.
evutil_make_socket_nonblocking(base->sig.ev_signal_pair[0]);
evutil_make_socket_nonblocking(base->sig.ev_signal_pair[1]);
/* Ev_signal_pair [1]; ev_signal_pair[0]; ev_signal_pair[1]
event_assign(&base->sig.ev_signal, base, base->sig.ev_signal_pair[1],
EV_READ | EV_PERSIST, evsig_cb, base);
.
/* This is the operation structure that should be processed when the actual signal occurs
base->evsigsel = &evsigops;
}
Evsig_cb:
static void
evsig_cb(evutil_socket_t fd, short what, void *arg)
{
.
while (1) {
/* Read data from the socket. It is non-blocking, so multiple reads will exit without an infinite loop */
n = recv(fd, signals, sizeof(signals), 0);
if (n == -1) {
.
break;
} else if (n == 0) {
break;
}
/* Count the number of times the corresponding signal is triggered */
for (i = 0; i < n; ++i) {
ev_uint8_t sig = signals[i];
if (sig < NSIG)
ncaught[sig]++;
}
}
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
for (i = 0; i < NSIG; ++i) {
if (ncaught[i])
/* Activate the corresponding signal */\
evmap_signal_active(base, i, ncaught[i]);
}
EVBASE_RELEASE_LOCK(base, th_base_lock);
}
\
Structure evsigops:
static const struct eventop evsigops = {
“signal”,
NULL,
Evsig_add, /* Add the operation function */
evsig_del,
NULL,
NULL,
0, 0, 0
};
\
The added action function is evsig_add:
static int
evsig_add(struct event_base *base, evutil_socket_t evsignal, short old, short events, void *p)
{
.
EVSIGBASE_LOCK();
evsig_base = base;
/* Signal added */
evsig_base_n_signals_added = ++sig->ev_n_signals_added;
/* The socket that sends the signal event, i.e. the unused pipe[0] */
evsig_base_fd = base->sig.ev_signal_pair[0];
EVSIGBASE_UNLOCK();
.
/* Set the new number handler function to evsig_handler */
_evsig_set_handler(base, (int)evsignal, evsig_handler);
/* Add event */
event_add(&sig->ev_signal, NULL);
\
return (0);
\
}
\
Evsig_handler ():
This function actually completes the signal sending operation:
evsig_handler(int sig) {
msg = sig;
send(evsig_base_fd, (char*)&msg, 1, 0);
} Continue to learn C language knowledge to enrich my thinking and design ideas.
\