Golang makes it easy to create a coroutine de-service for every TCP connection without worrying about performance. This is because Go internally implements an “asynchronous” IO model using Goroutine in conjunction with IO multiplexing, which allows developers to focus less on the underlying layer and write the upper-level business logic as required. How is this asynchronous IO implemented? I’m going to analyze it for Linux.

Under Unix/Linux system, everything is a file, and each TCP connection corresponds to a socket handle, which can also be regarded as a file. Sending and receiving data on the socket is equivalent to reading and writing a file. Therefore, a socket handle is usually represented by the file descriptor FD. You can go to /proc/pid/fd/ to view the FDS occupied by the process.

The kernel allocates a read (receive) buffer and a write (send) buffer to each socket handle. Sending data is to write data in the write buffer corresponding to the FD, while receiving data is to read data in the read buffer. When the program calls write or SEND, it does not mean that the data is sent. It simply copies the data to the write buffer, and when the time is right (it has accumulated enough), it sends the data to the destination.

The Golang Runtime still needs to be checked frequently to see if it is fD-ready, so it is not really asynchronous, strictly speaking, but a non-blocking IO reuse.

IO model

Borrow some pictures from the textbook

Block type IO

When a program tries to read data from the buffer, it doesn’t necessarily have data in the buffer. This causes it to get stuck in a system call, waiting for data to be read, or blocking the process when no data is read. This is called blocking IO. When you need to serve multiple clients, you can use the threaded mode. Each socket handle uses one thread to serve, so that only one thread is blocked. While this can solve process blocking, a significant amount of CPU resources are still wasted waiting for data, and using threads to service FDS can be a waste of resources, since there is a resource overhead if there are many FDS to process.

Non-blocking IO

The non-blocking IO, on the other hand, when a program wants to read data, if the buffer does not exist, it is directly returned to the user program, but requires the user program to check frequently until the data is ready. This also causes empty CPU usage.

IO multiplexing

IO multiplexing, on the other hand, uses a single thread to manage multiple Fd’s. Multiple FD’s can be added to the IO multiplexing function. Each time the function is called, the fd to be checked is passed in, and if there are any ready FD’s, the ready FD’s are returned directly, and then the thread processes or processes the ready FD’s sequentially. This allows one thread to manage multiple FD tasks, which is relatively efficient. Common IO multiplexing functions include SELECT, poll, and epoll. The biggest drawback of select and poll is that each call requires passing in all the FDS to be monitored. The kernel iterates over the passed FDS again. When concurrency is high, copying data between user and kernel states and polling FDS wastes a wave of system resources (select and poll are not discussed here).

Epoll is introduced

Next, the epoll system call

Epoll is more flexible and efficient than Select and poll in that it provides the user with three system call functions. Underlying Golang is “asynchronous” IO that is done through these three system calls combined with Goroutine.

// This is used to create and return an EPfd handle, which is used to add and remove fd. int epoll_create(int size); // Add, delete, and modify FDS to listen to epFD. int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event); // Pass in the epFD handle returned by the creation and the timeout to return the ready FD handle. int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);Copy the code
  • Calling epoll_CREATE creates an EventPoll object in the kernel, which maintains an EPItem collection, which can be understood simply as a FD collection.

  • The epoll_ctl function is called to encapsulate the FD as an epitem and add it to the eventPoll object. A callback is added to the kernel to register the EPitem, which is triggered when the FD status changes and adds the EPitem to the ready list rdList of EventPoll.

  • When the data arrives, the interrupt responder is triggered to copy the data to the SOCKET buffer of the FD, the state of the FD buffer changes, and the callback adds the epItem corresponding to the FD to the RDList ready queue.

  • Epoll_wait calls do not iterate, but simply return the ready RDLIST queue. If the RDLIST queue is empty, block waiting or wait for a timeout.

The general working principle is shown in figure

Asynchronous I/o

When the user program wants to read the FD data, the system call notifies the kernel directly and returns to do something else. When the kernel has the data ready, the system call notifies the user program and the user program processes the events on the FD.

Golang asynchronous IO implementation idea

As we all know, coroutines have very small resources, and coroutines also have many states such as blocked, ready, running, etc., you can use a coroutine to serve a FD without worrying about resources. The events monitored by fd are managed by Runtime to achieve coroutine scheduling and fD-dependent events. When the coroutine reads fd data but no data is available, Park lives the coroutine (changed to Gwaiting) and schedules the execution of other coroutines.

During the execution of coroutine scheduling, it checks whether FD is ready. If it is ready, the scheduler notifies the park coroutine that FD can be processed (changed to Grunnable and added to the execution queue). The coroutine processes FD data, which not only reduces CPU waste, but also realizes message notification. An asynchronous IO model is implemented at the user level.

This is the general idea of Golang netpoll. Let’s take a look at the code implementation. This article is based on go1.14.

The specific implementation

Let’s take a look at Golang Netpoll’s use of it.

The experiment case

Explore this with a very simple demo.

Func main() {fmt.println (" Server process ID: ", os.getpid ()) lister, err := net.Listen(" TCP ", "0.0.0.0:9009") if err! = nil {fmt.println (" connection failed ", err) return} for {conn, err := lister.accept () // Wait for connection if err! Println(" connection failed ", err) continue} go func() {defer conn.close () for {buf := make([]byte, 128) n, err := conn.Read(buf) if err ! = nil{fmt.Println(" read error ",err) return} fmt.Println(" read data: ",string(buf[:n]))}}()}}Copy the code

An internal call to Net.listen

ListenStream (lc.Listen->sl.listenTCP->internetSocket->socket to fd.listenStream); Creating a socket fd

The next step is to add the socket FD to the EventPoll to listen on the socket object.

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error { ...... // Bind the socket interface if err = syscall.Bind(fd.pfd.Sysfd, lsa); err ! = nil {return os.NewSyscallError("bind", err)} // Listen on the socket if err = listenFunc(fd.pfd.Sysfd, backlog); err ! Nil {return os.newSyscallError ("listen", err)} // Initialize the fd, that is, put the socket into the epoll, if err = fd.init(); err ! = nil { return err } lsa, _ = syscall.Getsockname(fd.pfd.Sysfd) fd.setAddr(fd.addrFunc()(lsa), nil) return nil } func (fd *FD) Init(net string, pollable bool) error { ...... // add socket fd to poll and go to err := fd.pd.init(fd)...... Runtime_pollServerInit, runtime_pollOpen, runtime_pollServerInit, runtime_pollOpen, Func (pd *pollDesc) init(fd * fd) error {//sync. Call epoll_create to create the eventPoll object serverinit.do (runtime_pollServerInit) Uintptr (fd.sysfd)) // If the uintptr(fd.sysfd) is used in the uintpollCTL function CTX, errno := runtime_pollOpen(uintptr(fd. = 0 { if ctx ! = 0 { runtime_pollUnblock(ctx) runtime_pollClose(ctx) } return errnoErr(syscall.Errno(errno)) } pd.runtimeCtx = ctx return nil }Copy the code

Look at runtime_pollServerInit, which encapsulates epoll_CREATE.

Func poll_runtime_pollServerInit() {// Initialize the global epoll object netpollinit()/global flag bit set to 1 atom.store (&netpollInited, 1)} func netpollinit() {// System call, create an eventPoll object epfd = epollCreate1 (_EPOLL_CLOEXEC) if epfd >= 0 {return}...... }Copy the code

Take a look at the runtime_pollOpen method and add the currently listening socket FD to the EventPoll object. This is actually a wrapper around epoll_ctl.

Func poll_runtime_pollOpen(fd uintptr) (* uintdesc, int) { Pd := pollcache.alloc() // lock(&pd.lock) if pd.wg! = 0 && pd.wg ! = pdReady { throw("runtime: blocked write on free polldesc") } if pd.rg ! = 0 && pd.rg ! = pdReady { throw("runtime: blocked read on free polldesc") } pd.fd = fd pd.closing = false pd.everr = false pd.rseq++ pd.rg = 0 pd.rd = 0 pd.wseq++ Pd. wg = 0 pd.wd = 0 UNLOCK (&pd.lock) var errno int32 //epoll_ctl System call errno = netpollopen(fd, pd) return pd, Int (errno)} func netPollopen (fd uintptr, pd *pollDesc) int32 {var ev epollevent ET needs to handle events every time they occur, otherwise the event will be lost. Ev. Events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET / / events recorded on pd Pointer * (* * pollDesc) (unsafe. The Pointer (& ev. Data)) = pd Return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)}Copy the code

An internal call to Accept

Next, return to the main function.

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { ...... // Check whether the fd state has changed if err := fd.pd.prepareread (fd.isfile); err ! = nil {return -1, nil, "", err} for {// Accept system call, if there is a connection request to the listening socket, return the socket file descriptor //, otherwise return an EAGAIN error, If err == nil {return s, rsa, "", Err} switch err {case syscall.eagain: if fd.pd.pollable() {err = fd.pd.waitRead(fd.isfile); err == nil { continue } } case syscall.ECONNABORTED: continue } return -1, nil, errcall, err } } func (pd *pollDesc) wait(mode int, isFile bool) error { if pd.runtimeCtx == 0 { return errors.New("waiting for unsupported file type") } // Inside the runtime_pollWait method, which jumps to the Runtime package, Goroutine res := runtime_pollWait(pd.runtimeCtx, mode) return convertErr(res, isFile) } func poll_runtime_pollWait(pd *pollDesc, mode int) int { ...... // Enter the netPollBlock function, which blocks the goroutine for! netpollblock(pd, int32(mode), false) { err = netpollcheckerr(pd, int32(mode)) if err ! = 0 { return err } } return 0 } func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } ...... If waitio | | netpollcheckerr (pd, mode) = = 0 {/ / gark live the g, at this time and focus on the first two, a netpollblockcommit function, a GPP for current pd rg or wg, // Goroutine gopark(netPollBlockCommit, unsafe.Pointer(GPP), waitReasonIOWait, traceEvGoBlockNet, 5) } old := atomic.Xchguintptr(gpp, 0) if old > pdWait { throw("runtime: corrupted polldesc") } return old == pdReady } func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { ...... Lock is GPP pointer mp.waitlock = lock //unlockf is netPollBlockCommit function mp.waitunlockf = unlockf...... Func park_m(gp *g) {// Get the current goroutine _g_ := getg(). Casgstatus (gp, _Grunning, _Gwaiting) // Dislink m and g dropg() if fn := _g_.m.waitunlockf; fn ! = nil {// Call the function argument just passed in, Netpollblockcommit ok := fn(gp, _g_.m.waitlock) // Clear _g_.m.waitunlockf = nil _g_. ok { if trace.enabled { traceGoUnpark(gp, 2) } casgstatus(gp, _Gwaiting, _Grunnable) execute(gp, Func netPollBlockCommit (gp *g, func netPollBlockcommit (gp *g, // unsafe.Pointer) bool {// unsafe.Pointer); Uintptr (*uintptr)(GPP), pdWait, uintptr(unsafe.pointer (gp))))) if r {// Atomic.Xadd(&netpollWaiters, 1)} return R}Copy the code

At this point, the accept function is blocked, and the system will send the goroutine where park resides to Ready when the status of the socket FD listening for 0.0.0.0:9009 changes (i.e., when a new client requests a connection).

Func (fd * fd) accept () (int, syscall.sockaddr, string, error) {...... For {//2. Use the accept system call to get a new connection, Linux will assign a new fd to the new connection, // This function returns the process descriptor s, rsa, errCall, err := Accept (fd.sysfd) if err == nil {//3. Return s, rsa, "", err} switch err {case syscall.EAGAIN: if fd.pd.pollable() {//1. If err = fd.pd.waitRead(fd.isfile); if err = fd.pd.waitread (fd.isfile); err == nil { continue } } ...... }... Func (fd *netFD) accept() (netFD *netFD, err error) { err := fd.pfd.Accept() ...... If netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err ! CloseFunc(d) {poll.closefunc (d) return nil, err} If err = netfd.init(); if err = netfd.init(); err ! = nil {fd.close () return nil, err} _ := syscall.getsockName (netfd.pfd.sysfd) netfd.setaddr (netfd.addrfunc ()(LSA), netfd.addrFunc()(rsa)) return netfd, nil }Copy the code

Wake up the coroutine where Park lives

Go will execute the epoll_wait system call when the Goroutine is scheduled to check for FDS that have changed state, pull them out, and wake up the corresponding Goroutine to process them. This section corresponds to the Netpoll method in runtime.

Schedule () -> findrunnable() -> netpoll()

Func finDRUNnable () (gp *g, inheritTime bool) {_g_ := getg() // Find executable g...... from local queue and global queue respectively // Determine whether the condition is met, initialize the netpoll object, whether wait, If Netpollinited () && atomic.load (&netpollWaiters) > 0 && atomic.load64 (& sched.lastPoll)! Epoll_wait = 0 {// Epoll_wait is blocked or not blocked. The rdList of the kernel eventPoll object is immediately returned if list := netpoll(false); ! List.empty () {gp := list.pop() {// Inject G into the scheduler and clear glist injectGlist (&list) // Change gp status casgStatus (gp, _Gwaiting, _Grunnable) if trace.enabled {traceGoUnpark(gp, 0)} // Return runnable g return gp, false}}....... Stopm () goto top} // Further encapsulation of epoll_wait func netpoll(block bool) gList{if epfd == -1 {return gList{}} waitms := int32(-1) if ! Block {waitms = 0} // Declare an epollevent event. When the epoll_wait system is called, the array is assigned and an index bit is returned. / Then the array can be iterated to fetch the ready FD event. var events [128]epollevent retry: Epollwait (epfd, &events[0], int32(len(events)), waitms) if n < 0 {if n! = -_EINTR { println("runtime: epollwait on fd", epfd, "failed with", -n) throw("runtime: Netpoll failed")} goto retry} var toRun gList for I := int32(0); i < n; I ++ {ev := &events[I] if ev.events == 0 {continue} var mode int32 In mode flags if ev. Events & (_EPOLLIN | _EPOLLRDHUP | _EPOLLHUP | _EPOLLERR)! = 0 { mode += 'r' } if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) ! = 0 { mode += 'w' } if mode ! Unsafe. Pointer(&ev.data)) pd. Everr = false if ev.events == _EPOLLERR {// Unsafe. Pointer(&ev.data)) pd. Pd. everr = true} Run queue netPollReady (&torun, pd, mode) } } if block && toRun.empty() { goto retry } return toRun } func netpollready(toRun *gList, pd *pollDesc, mode int32) { var rg, wg *g if mode == 'r' || mode == 'r'+'w' { rg = netpollunblock(pd, 'r', True)} if mode = = 'w' | | mode = = 'r' + 'w' {wg = netpollunblock (pd, 'w', true)} / / will return to the if block goroutine to join gList rg! = nil { toRun.push(rg) } if wg ! = nil { toRun.push(wg) } }Copy the code

An internal call to conn.read

Back to the main function, we use the go func form to use a coroutine to process a TCP connection. Each coroutine contains conn.Read. If the buffer is unreadable, the goroutine also lives with Park, waiting for the socket FD to be readable. The scheduler schedules it through the netpoll function.

func main() { ...... Go func() {defer conn.close () for {buf := make([]byte, 128) // Read the buffer into buf n, err := conn.read (buf)...... } }() } } func (fd *FD) Read(p []byte) (int, error) { ...... N, err := syscall.Read(fd.sysfd, p) if err! = nil {n = 0 if err == syscall.eagain &&fd.pd.pollable () {// The goroutine is not readable. If err = fd.pd.waitread (fd.isfile); if err = fd.pd.waitread (fd.isfile); err == nil { continue } } } ...... }}Copy the code

After waiting for the buffer to be read or written, shchedule calls netpoll and epoll_wait to detect and wake up the goroutine. You can check out netpoll above, and there is no duplication here.

Golang also provides an epoll item deletion operation that encapsulates the poll_runtime_pollClose function

Func poll_runtime_pollClose(pd *pollDesc) {....... Netpollclose (pd.fd) // Release the corresponding pd pollcache.free(pd)} // call epoll_ctl, Func netPollClose (fd Uintptr) int32 {var ev epollevent return -epollctl(epfd, _EPOLL_CTL_DEL, int32(fd), &ev) }Copy the code

Partial system call

Some system calls are captured to analyze the general process of interaction between the program and the kernel.

$ strace -f ./server
Copy the code

Some of the system call functions are as follows.

#... Println(" server process id: ",os.Getpid()) [pid 30307] getpid() = 30307 [pid 30307] write(1, "346234215345212241347253257350277233347250213 id357274232 30307 n", 27 server process id: 30307) = 27... [pid 30308] <... Nanosleep resumed> NULL) = 0 # Open the system file that defines the maximum number of TCP connections. And join the epoll node [30307] pid openat (AT_FDCWD, "/ proc/sys/net/core/somaxconn", O_RDONLY | O_CLOEXEC < unfinished... > [pid 30308] nanosleep({tv_sec=0, tv_nsec=20000}, <unfinished ... > [pid 30307] <... Openat resumed>) = 4 Eventpoll [pid 30307] epoll_create1(EPOLL_CLOEXEC) = 5 # Add fd to epoll_ctl(5, EPOLL_CTL_ADD, 4, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=2174189320, u64=139635855949576}}) = 0 [pid 30307] fcntl(4, F_GETFL) = 0x8000 (flags O_RDONLY|O_LARGEFILE) [pid 30307] fcntl(4, F_SETFL, O_RDONLY|O_NONBLOCK|O_LARGEFILE) = 0 [pid 30308] <... nanosleep resumed> NULL) = 0 [pid 30307] read(4, <unfinished ... > # Execute epoll_wait to view ready events [PID 30308] epoll_pwait(5, < Unfinished... > [pid 30307] <... read resumed> "512n", 65536) = 4 [pid 30308] <... epoll_pwait resumed> [{EPOLLIN|EPOLLOUT, {u32=2174189320, u64=139635855949576}}], 128, 0, NULL, 139635812673280) = 1 [pid 30307] read(4, <unfinished ... > [pid 30308] nanosleep({tv_sec=0, tv_nsec=20000}, <unfinished ... > [pid 30307] <... Read resumed > ", "65532) = 0 # to/proc/sys/net/core/somaxconn file of fd is removed from the epoll [30307] pid epoll_ctl (5, EPOLL_CTL_DEL, 4, 0xC00005e8D4) = 0 # Close somaxConn descriptor [PID 30307] close(4) = 0 SO_REUSEADDR, [1], 4) = 0 [pid 30307] bind(3, {sa_family=AF_INET6, sin6_port=htons(9009), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0 [pid 30307] listen(3, 512 <unfinished ... > [pid 30308] <... nanosleep resumed> NULL) = 0 [pid 30307] <... listen resumed> ) = 0 [pid 30308] nanosleep({tv_sec=0, tv_nsec=20000}, <unfinished ... > # will be used in listening socket fd join the epoll [30307] pid epoll_ctl (3, 5, EPOLL_CTL_ADD, {EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET, {u32=2174189320, u64=139635855949576}}) = 0 [pid 30307] getsockname(3, {sa_family=AF_INET6, sin6_port=htons(9009), Inet_pton (AF_INET6, "::", &SIN6_addr), sin6_flowInfo =htonl(0), sin6_scope_id=0}, [112->28]) =0 Error [PID 30307] AccepT4 (3, 0xC00005eb98, [112], EAGAIN SOCK_CLOEXEC | SOCK_NONBLOCK) = - 1 Resource temporarily unavailable) # check whether there is a ready fd, the call is blocked, [PID 30307] epoll_pWAIT (5, [], 128, 0, NULL, 0) = 0 [PID 30308] <... Nanosleep resumed> NULL) = 0 # Check if any fd is ready, and this time block until a connection comes in [PID 30307] epoll_pwait(5, < Unfinished... > [pid 30308] futex(0x60dc70, FUTEX_WAIT_PRIVATE, 0, {tv_sec=60, tv_nsec=0} <unfinished ... > [pid 30307] <... epoll_pwait resumed> [{EPOLLIN, {u32=2174189320, u64=139635855949576}}], 128, -1, NULL, 0) = 1 [pid 30307] futex(0x60dc70, FUTEX_WAKE_PRIVATE, 1) = 1 [pid 30308] <... Futex resumed>) = 0 # new connection, meaning that a client connection has been received and assigned a FD is 4 [PID 30307] Accept4 (3, < Unfinished... >, <... Accept4 resumed> {sa_family=AF_INET6, sin6_port=htons(52082), inet_pton(AF_INET6, ":: FFFF :127.0.0.1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [112->28], SOCK_CLOEXEC | SOCK_NONBLOCK) = 4 # 4 join the epoll management [30307] pid epoll_ctl (5, EPOLL_CTL_ADD, 4, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=2174189112, u64=139635855949368}}) = 0 [pid 30307] getsockname(4, {sa_family=AF_INET6, sin6_port=htons(9009), inet_pton(AF_INET6, ":: FFFF :127.0.0.1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [112->28]) = 0 ...... [PID 30309] Epoll_ctl (5, EPOLL_CTL_DEL, 4, 0xC00005fDD4 < Unfinished... > [pid 30308] nanosleep({tv_sec=0, tv_nsec=20000}, <unfinished ... > [pid 30309] <... epoll_ctl resumed> ) = 0 [pid 30309] close(4) = 0 [pid 30309] epoll_pwait(5, [], 128, 0, NULL, 824634114048) = 0 # Unfinished [PID 30309] epoll_pWAIT (5, < Unfinished... >...Copy the code

The resources

  • “Background development core technology and application practice” chapter 7: network IO model

  • Advanced Programming in Unix Environments. Chapter 14: Advanced IO

  • Draveness. Me/Golang/Docs…

  • “Go netpoller native network model all-round reveal the source of” mp.weixin.qq.com/s/3kqVry3uV…