A Tiny Introduction Asynchronous IO
Most beginner programmers start by blocking IO calls. If an IO call is synchronous, when you call it, it will not return until the operation is complete or enough time has passed for your network stack to drop automatically. When you call connect() on a TCP connection, for example, your operating system queues a SYN packet to the other side of the TCP connection on the host. It won’t give you control until you receive a SYN ACK packet on the other side or until enough time has passed that it decides to give up.
Here is a very simple example using a blocking network call. It opens a connection to www.google.com, sends a simple HTTP request, and prints the response to standard output.
Example: a simple blocking HTTP client
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For gethostbyname */
#include <netdb.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
int main(int c, char **v)
{
const char query[] =
"GET / HTTP/1.0\r\n"
"Host: www.google.com\r\n"
"\r\n";
const char hostname[] = "www.google.com";
struct sockaddr_in sin;
struct hostent *h;
const char *cp;
int fd;
ssize_t n_written, remaining;
char buf[1024];
/* Look up the IP address for the hostname. Watch out; this isn't
threadsafe on most platforms. */
h = gethostbyname(hostname);
if (!h) {
fprintf(stderr, "Couldn't lookup %s: %s", hostname, hstrerror(h_errno));
return 1;
}
if (h->h_addrtype != AF_INET) {
fprintf(stderr, "No ipv6 support, sorry.");
return 1;
}
/* Allocate a new socket */
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
perror("socket");
return 1;
}
/* Connect to the remote host. */
sin.sin_family = AF_INET;
sin.sin_port = htons(80);
sin.sin_addr = *(struct in_addr*)h->h_addr;
if (connect(fd, (struct sockaddr*) &sin, sizeof(sin))) {
perror("connect");
close(fd);
return 1;
}
/* Write the query. */
/* XXX Can send succeed partially? */
cp = query;
remaining = strlen(query);
while (remaining) {
n_written = send(fd, cp, remaining, 0);
if (n_written <= 0) {
perror("send");
return 1;
}
remaining -= n_written;
cp += n_written;
}
/* Get an answer back. */
while (1) {
ssize_t result = recv(fd, buf, sizeof(buf), 0);
if (result == 0) {
break;
} else if (result < 0) {
perror("recv");
close(fd);
return 1;
}
fwrite(buf, 1, result, stdout);
}
close(fd);
return 0;
}
Copy the code
The code above blocks all network calls: the gethostbyname function does not return until www.google.com has resolved successfully or failed; The connect function does not return until the connection is successful; The recv function does not return until it receives data or a close; The send function until it finally flushes its output to the kernel write buffer.
Now, IO blocking is not unfortunate. Blocking IO will work fine for you if your program doesn’t do anything else in the meantime. But suppose you need to write a program that handles multiple connections simultaneously. Let’s take a concrete example: let’s say you want to read input from two connections, but you don’t know which connection will input first. You can’t say
Bad case
/* These codes don't work */ char buf[1024]; int i, n; while (i_still_want_to_read()) { for (i=0; i<n_sockets; ++i) { n = recv(fd[i], buf, sizeof(buf), 0); if (n==0) handle_close(fd[i]); else if (n<0) handle_error(fd[i], errno); else handle_input(fd[i], buf, n); }}Copy the code
When data arrives on fd[2], your program cannot read data on FD [2] until it has read data on FD [0] and FD [1].
Sometimes people solve this problem by using multithreading, or multi-process services. One of the simplest ways to do this is to use multiple threads, each thread handling one connection. In this way, each connection has its own process, and one connection’s IO blocking calls wait without affecting the other connection’s process blocking.
Here’s another example program. This is a trivial service program that listens on TCP connection port 40713, reads data from an input line, and writes out data after ROT13 processing. Here, Unix fork() is called for each incoming connection to create a new process.
Example: Server that branches out of ROT13
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#define MAX_LINE 16384
char rot13_char(char c)
{
/* We don't want to use isalpha here; setting the locale would change
* which characters are considered alphabetical. */
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}
void child(int fd)
{
char outbuf[MAX_LINE+1];
size_t outbuf_used = 0;
ssize_t result;
while (1) {
char ch;
result = recv(fd, &ch, 1, 0);
if (result == 0) {
break;
} else if (result == -1) {
perror("read");
break;
}
/* We do this test to keep the user from overflowing the buffer. */
if (outbuf_used < sizeof(outbuf)) {
outbuf[outbuf_used++] = rot13_char(ch);
}
if (ch == '\n') {
send(fd, outbuf, outbuf_used, 0);
outbuf_used = 0;
continue;
}
}
}
void
run(void)
{
int listener;
struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(40713);
listener = socket(AF_INET, SOCK_STREAM, 0);
#ifndef WIN32
{
int one = 1;
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
}
#endif
if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
perror("bind");
return;
}
if (listen(listener, 16)<0) {
perror("listen");
return;
}
while (1) {
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener, (struct sockaddr*)&ss, &slen);
if (fd < 0) {
perror("accept");
} else {
if (fork() == 0) {
child(fd);
exit(0);
}
}
}
}
int main(int c, char **v)
{
run();
return 0;
}
Copy the code
So, do we have a perfect solution to handle multiple connections at the same time? So can we stop writing this book now and do something else? First, process creation (or thread creation) can be quite expensive on some platforms. In real life, instead of creating a new process, you want to have a thread pool. But fundamentally, there aren’t as many threads as you might think. If your program needs to handle thousands of connections at the same time, it is not efficient to handle thousands of threads because the CPU processor can only handle a few threads.
But what if threads don’t resolve multiple connections? In Unix sockets, set your sockets to non-blocking. Set in Unix using the following function.
fcntl(fd, F_SETFL, O_NONBLOCK)
The file descriptor fd is created by the socket function. Once you set the socket descriptor fd to non-blocking, when you tell the network to call fd, the call will either complete immediately or return an error saying “I can’t make any progress now, please try again”. So our two socket examples can be written like this:
Bad example: Busy polling all sockets
/* This will work, but the performance will be unforgivably bad. */ int i, n; char buf[1024]; for (i=0; i < n_sockets; ++i) fcntl(fd[i], F_SETFL, O_NONBLOCK); while (i_still_want_to_read()) { for (i=0; i < n_sockets; ++i) { n = recv(fd[i], buf, sizeof(buf), 0); if (n == 0) { handle_close(fd[i]); } else if (n < 0) { if (errno == EAGAIN) ; /* The kernel didn't have any data for us to read. */ else handle_error(fd[i], errno); } else { handle_input(fd[i], buf, n); }}}Copy the code
Now, we use a non-blocking socket, and the code above will work, but barely. Performance will be poor for two main reasons. First, when there is no data to read on the connection, polling will continue forever, and your CPU will be completely occupied. Second, if you try to handle one or two connections using this method, you’ll make a kernel call for each one, whether it has data for you or not. So we need a way to tell the kernel “Wait for those sockets to have data for me and tell me those sockets are ready”.
The old solution was that people used the select() function all the time to solve this problem. The select() function calls three sets of FDS (implemented as a bit array) : one read, one write, and one exception handling. It waits until a socket is ready from one of the collections, and sets the collection to contain the sockets ready for use.
We have another example of this using select:
Example: Use select
/* If you only have a couple dozen fds, this version won't be awful */ fd_set readset; int i, n; char buf[1024]; while (i_still_want_to_read()) { int maxfd = -1; FD_ZERO(&readset); /* Add all of the interesting fds to readset */ for (i=0; i < n_sockets; ++i) { if (fd[i]>maxfd) maxfd = fd[i]; FD_SET(fd[i], &readset); } /* Wait until one or more fds are ready to read */ select(maxfd+1, &readset, NULL, NULL, NULL); /* Process all of the fds that are still set in readset */ for (i=0; i < n_sockets; ++i) { if (FD_ISSET(fd[i], &readset)) { n = recv(fd[i], buf, sizeof(buf), 0); if (n == 0) { handle_close(fd[i]); } else if (n < 0) { if (errno == EAGAIN) ; /* The kernel didn't have any data for us to read. */ else handle_error(fd[i], errno); } else { handle_input(fd[i], buf, n); }}}}Copy the code
There is a POT13 server implemented with SELECT
Example: POT13 server implemented by select ()
/* For sockaddr_in */ #include <netinet/in.h> /* For socket functions */ #include <sys/socket.h> /* For fcntl */ #include <fcntl.h> /* for select */ #include <sys/select.h> #include <assert.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #define MAX_LINE 16384 char rot13_char(char c) { /* We don't want to use isalpha here; setting the locale would change * which characters are considered alphabetical. */ if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M')) return c + 13; else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z')) return c - 13; else return c; } struct fd_state { char buffer[MAX_LINE]; size_t buffer_used; int writing; size_t n_written; size_t write_upto; }; struct fd_state * alloc_fd_state(void) { struct fd_state *state = malloc(sizeof(struct fd_state)); if (! state) return NULL; state->buffer_used = state->n_written = state->writing = state->write_upto = 0; return state; } void free_fd_state(struct fd_state *state) { free(state); } void make_nonblocking(int fd) { fcntl(fd, F_SETFL, O_NONBLOCK); } int do_read(int fd, struct fd_state *state) { char buf[1024]; int i; ssize_t result; while (1) { result = recv(fd, buf, sizeof(buf), 0); if (result <= 0) break; for (i=0; i < result; ++i) { if (state->buffer_used < sizeof(state->buffer)) state->buffer[state->buffer_used++] = rot13_char(buf[i]); if (buf[i] == '\n') { state->writing = 1; state->write_upto = state->buffer_used; } } } if (result == 0) { return 1; } else if (result < 0) { if (errno == EAGAIN) return 0; return -1; } return 0; } int do_write(int fd, struct fd_state *state) { while (state->n_written < state->write_upto) { ssize_t result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0); if (result < 0) { if (errno == EAGAIN) return 0; return -1; } assert(result ! = 0); state->n_written += result; } if (state->n_written == state->buffer_used) state->n_written = state->write_upto = state->buffer_used = 0; state->writing = 0; return 0; } void run(void) { int listener; struct fd_state *state[FD_SETSIZE]; struct sockaddr_in sin; int i, maxfd; fd_set readset, writeset, exset; sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0; sin.sin_port = htons(40713); for (i = 0; i < FD_SETSIZE; ++i) state[i] = NULL; listener = socket(AF_INET, SOCK_STREAM, 0); make_nonblocking(listener); #ifndef WIN32 { int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); } #endif if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { perror("bind"); return; } if (listen(listener, 16)<0) { perror("listen"); return; } FD_ZERO(&readset); FD_ZERO(&writeset); FD_ZERO(&exset); while (1) { maxfd = listener; FD_ZERO(&readset); FD_ZERO(&writeset); FD_ZERO(&exset); FD_SET(listener, &readset); for (i=0; i < FD_SETSIZE; ++i) { if (state[i]) { if (i > maxfd) maxfd = i; FD_SET(i, &readset); if (state[i]->writing) { FD_SET(i, &writeset); } } } if (select(maxfd+1, &readset, &writeset, &exset, NULL) < 0) { perror("select"); return; } if (FD_ISSET(listener, &readset)) { struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0) { perror("accept"); } else if (fd > FD_SETSIZE) { close(fd); } else { make_nonblocking(fd); state[fd] = alloc_fd_state(); assert(state[fd]); /*XXX*/ } } for (i=0; i < maxfd+1; ++i) { int r = 0; if (i == listener) continue; if (FD_ISSET(i, &readset)) { r = do_read(i, state[i]); } if (r == 0 && FD_ISSET(i, &writeset)) { r = do_write(i, state[i]); } if (r) { free_fd_state(state[i]); state[i] = NULL; close(i); } } } } int main(int c, char **v) { setvbuf(stdout, NULL, _IONBF, 0); run(); return 0; }Copy the code
But we’re not done. Because the events consumed to generate and read the SELECT () bit array will be proportional to the maximum FD supplied by the select. Calling ‘select()’ when there are a large number of sockets is bad.
Different operating systems offer different replacement functions for you to choose from, including poll(),epoll(),kqueue(),evports, and /dev/poll. These perform better than select and, with the exception of poll(), the time complexity of adding a socket, deleting a socket, and notifying a socket that IO is ready is O(1).
Unfortunately, there is no valid interface as a standard. Linux has epoll(),BSD has kqueue(),Solaris has evports and /dev/poll, etc. Different systems have different implementations. So, if you want to write a convenient asynchronous application with high performance, you need a unified interface that includes these abstract interfaces to provide effective solutions for different platforms.
There is an underlying Libevent API that provides this unified interface for you. It provides a unified interface for various select() alternatives, using the most efficient version running on any computer.
There is also another version of the asynchronous POT13 server implementation. Now we use libevent 2 instead of select(). Note that the fd_sets structure is now gone: instead, We connect and separate events via an event_base structure, which might be implemented according to select(),poll(),epoll(),kqueue(), etc.
Example: an underlying Libevent implementation of POT13 server
/* For sockaddr_in */ #include <netinet/in.h> /* For socket functions */ #include <sys/socket.h> /* For fcntl */ #include <fcntl.h> #include <event2/event.h> #include <assert.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #define MAX_LINE 16384 void do_read(evutil_socket_t fd, short events, void *arg); void do_write(evutil_socket_t fd, short events, void *arg); char rot13_char(char c) { /* We don't want to use isalpha here; setting the locale would change * which characters are considered alphabetical. */ if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M')) return c + 13; else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z')) return c - 13; else return c; } struct fd_state { char buffer[MAX_LINE]; size_t buffer_used; size_t n_written; size_t write_upto; struct event *read_event; struct event *write_event; }; struct fd_state * alloc_fd_state(struct event_base *base, evutil_socket_t fd) { struct fd_state *state = malloc(sizeof(struct fd_state)); if (! state) return NULL; state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state); if (! state->read_event) { free(state); return NULL; } state->write_event = event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state); if (! state->write_event) { event_free(state->read_event); free(state); return NULL; } state->buffer_used = state->n_written = state->write_upto = 0; assert(state->write_event); return state; } void free_fd_state(struct fd_state *state) { event_free(state->read_event); event_free(state->write_event); free(state); } void do_read(evutil_socket_t fd, short events, void *arg) { struct fd_state *state = arg; char buf[1024]; int i; ssize_t result; while (1) { assert(state->write_event); result = recv(fd, buf, sizeof(buf), 0); if (result <= 0) break; for (i=0; i < result; ++i) { if (state->buffer_used < sizeof(state->buffer)) state->buffer[state->buffer_used++] = rot13_char(buf[i]); if (buf[i] == '\n') { assert(state->write_event); event_add(state->write_event, NULL); state->write_upto = state->buffer_used; } } } if (result == 0) { free_fd_state(state); } else if (result < 0) { if (errno == EAGAIN) // XXXX use evutil macro return; perror("recv"); free_fd_state(state); } } void do_write(evutil_socket_t fd, short events, void *arg) { struct fd_state *state = arg; while (state->n_written < state->write_upto) { ssize_t result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0); if (result < 0) { if (errno == EAGAIN) // XXX use evutil macro return; free_fd_state(state); return; } assert(result ! = 0); state->n_written += result; } if (state->n_written == state->buffer_used) state->n_written = state->write_upto = state->buffer_used = 1; event_del(state->write_event); } void do_accept(evutil_socket_t listener, short event, void *arg) { struct event_base *base = arg; struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0) { // XXXX eagain?? perror("accept"); } else if (fd > FD_SETSIZE) { close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */ } else { struct fd_state *state; evutil_make_socket_nonblocking(fd); state = alloc_fd_state(base, fd); assert(state); /*XXX err*/ assert(state->write_event); event_add(state->read_event, NULL); } } void run(void) { evutil_socket_t listener; struct sockaddr_in sin; struct event_base *base; struct event *listener_event; base = event_base_new(); if (! base) return; /*XXXerr*/ sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0; sin.sin_port = htons(40713); listener = socket(AF_INET, SOCK_STREAM, 0); evutil_make_socket_nonblocking(listener); #ifndef WIN32 { int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); } #endif if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { perror("bind"); return; } if (listen(listener, 16)<0) { perror("listen"); return; } listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); /*XXX check it */ event_add(listener_event, NULL); event_base_dispatch(base); } int main(int c, char **v) { setvbuf(stdout, NULL, _IONBF, 0); run(); return 0; }Copy the code
There’s something in the code that needs to be noted: Sockets are of type ‘int’, and we’re using evutil_socket_t instead. Call evutil_make_socket_nonblocking instead of FCNTL (O_NONBLOCK) to set the socket non-blocking. These changes are our code compatible with Win32 network APIS)
How about that? Is that convenient? (How about on Windows?)
As you may have noticed, our code is getting more efficient and complex. Instead of managing buffers for each connection, each process is allocated a separate stack. We do not need to explicitly track which socket is being read or written: this is implicit in our code. We don’t need a design to keep track of how many operations have been completed: we just use loops and stack variables.
In addition, if you have extensive network programming experience on Windows, you will find that using the examples above will not achieve very good performance. On Windows, the fastest way to do asynchronous I/O is not to use an interface like SELECT () : it is to use the IOCP (IO Completion Ports) API. Unlike the other fastest network apis, IOCP does not notify your program when a socket is ready to operate, but notifies you when your operation is complete. Instead, the program tells the Windows network stack to start a network operation, and IOCP notifies the program when the operation is complete.
Fortunately, Libevent 2’s BufferEvents interface overcomes these shortcomings: it makes writing programs very simple and provides an interface that runs efficiently on Windows and Unix.
This is the last demonstration of the POT13 server, via the BufferEvents API
Example: a very simple POT13 server implemented with Libevent
/* For sockaddr_in */ #include <netinet/in.h> /* For socket functions */ #include <sys/socket.h> /* For fcntl */ #include <fcntl.h> #include <event2/event.h> #include <event2/buffer.h> #include <event2/bufferevent.h> #include <assert.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #define MAX_LINE 16384 void do_read(evutil_socket_t fd, short events, void *arg); void do_write(evutil_socket_t fd, short events, void *arg); char rot13_char(char c) { /* We don't want to use isalpha here; setting the locale would change * which characters are considered alphabetical. */ if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M')) return c + 13; else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z')) return c - 13; else return c; } void readcb(struct bufferevent *bev, void *ctx) { struct evbuffer *input, *output; char *line; size_t n; int i; input = bufferevent_get_input(bev); output = bufferevent_get_output(bev); while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) { for (i = 0; i < n; ++i) line[i] = rot13_char(line[i]); evbuffer_add(output, line, n); evbuffer_add(output, "\n", 1); free(line); } if (evbuffer_get_length(input) >= MAX_LINE) { /* Too long; just process what there is and go on so that the buffer * doesn't grow infinitely long. */ char buf[1024]; while (evbuffer_get_length(input)) { int n = evbuffer_remove(input, buf, sizeof(buf)); for (i = 0; i < n; ++i) buf[i] = rot13_char(buf[i]); evbuffer_add(output, buf, n); } evbuffer_add(output, "\n", 1); } } void errorcb(struct bufferevent *bev, short error, void *ctx) { if (error & BEV_EVENT_EOF) { /* connection has been closed, do any clean up here */ /* ... */ } else if (error & BEV_EVENT_ERROR) { /* check errno to see what error occurred */ /* ... */ } else if (error & BEV_EVENT_TIMEOUT) { /* must be a timeout event handle, handle it */ /* ... */ } bufferevent_free(bev); } void do_accept(evutil_socket_t listener, short event, void *arg) { struct event_base *base = arg; struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0) { perror("accept"); } else if (fd > FD_SETSIZE) { close(fd); } else { struct bufferevent *bev; evutil_make_socket_nonblocking(fd); bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); bufferevent_setcb(bev, readcb, NULL, errorcb, NULL); bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE); bufferevent_enable(bev, EV_READ|EV_WRITE); } } void run(void) { evutil_socket_t listener; struct sockaddr_in sin; struct event_base *base; struct event *listener_event; base = event_base_new(); if (! base) return; /*XXXerr*/ sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0; sin.sin_port = htons(40713); listener = socket(AF_INET, SOCK_STREAM, 0); evutil_make_socket_nonblocking(listener); #ifndef WIN32 { int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); } #endif if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { perror("bind"); return; } if (listen(listener, 16)<0) { perror("listen"); return; } listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); /*XXX check it */ event_add(listener_event, NULL); event_base_dispatch(base); } int main(int c, char **v) { setvbuf(stdout, NULL, _IONBF, 0); run(); return 0; }Copy the code
Does all this really work?
Write a paragraph here about the efficiency of XXX, which is really outdated for Libevnet.
Link to Original English