Gunicorn “Green Unicorn”, born out of the Ruby Community Unicorn, is a WSGI HTTP Server. After learning Gunicorn, we were able to deploy the previous Bottle program. Old rules, this article is divided into the following parts:

  • Introduction to the Gunicorn project structure
  • Gunicorn use
  • Gunicorn – application implementation
  • Arbiter implementation
  • Sync – worker
  • summary
  • tip

Introduction to the Gunicorn project structure

The gunicorn source code selection is version 20.0.0. The main files and packages are as follows:

file describe
The app package Application by Guincorn (not applicaton defined by WSGI)
HTTP packet Some of gunicorn’s handling of the HTTP protocol
Workers package Gunicorn’s work class implementation includes sync implementation, thread pool version implementation gThread, and asynchronous version implementation gEventlet, GEvent, etc
arbiter.py Master implementation for Guicorn

According to the gunicorn design features:

Gunicorn is based on the pre-fork worker model. This means that there is a central master process that manages a set of worker processes. The master never knows anything about individual clients. All requests and responses are handled completely by worker processes.

Gunicorn uses a pre-fork work model, in which the master forks a predetermined number of works in advance to manage the set of workers. All requests and responses are processed by the worker process.

We focus on the gunicorn service implementation and how master-workers are implemented and collaborate.

Gunicorn use

Writing the test app, you can see that this is a WSGI compliant application:

# myapp.py def app(environ, start_response): # env and HTTP state and header callback data = b"Hello, World! \n" start_response("200 OK", [ ("Content-Type", "text/plain"), ("Content-Length", STR (len(data))) return iter([data]Copy the code

Use four Work nodes to start the service in log level debug mode and load myApp :app

# gunicorn -w 4 --log-level debug myapp:app [2021-02-23 17:58:57 +0800] [50258] [DEBUG] Current configuration: # Ready to configure... [2021-02-23 18:01:12 +0800] [50462] [INFO] [2021-02-23 18:01:12 +0800] [50462] [DEBUG] Arbiter booted # Arbiter booted [2021-02-23 18:01:12 +0800] [50462] [INFO] Listening at: http://127.0.0.1:8000 (50462) # Listening port [2021-02-23 18:01:12 +0800] [50462] [INFO] Using worker: sync [2021-02-23 18:01:12 +0800] [50464] [INFO] Booting worker with pid: 50464 # Start worker [2021-02-23 18:01:12 +0800] [50465] [INFO] Booting worker with PID: 50465 [2021-02-23 18:01:12 +0800] [50466] [INFO] Booting worker with pid: 50466 [2021-02-23 18:01:12 +0800] [50467] [INFO] Booting worker with pid: 50467 [2021-02-23 18:01:12 +0800] [50462] [DEBUG] 4 workersCopy the code

Use curl to test your service

# curl http://127.0.0.1:8000
Hello, World!
Copy the code

At the same time, gunicorn can see that worker=50465 processes the HTTP request

[2021-02-24 16:09:39 +0800] [50465] [DEBUG] GET /
Copy the code

At run time, you can also manually increase the number of work nodes by sending signals

# kill -TTIN 50462
Copy the code

Observing the service log, it can be found that the master=50462 process processes the TTIN signal and expands the number of worker nodes to 5

. [2021-02-24 18:02:56 +0800] [50462] [INFO] Handling signal: ttin [2021-02-24 18:02:56 +0800] [75918] [INFO] Booting worker with pid: 75918 [2021-02-24 18:02:56 +0800] [50462] [DEBUG] 5 workersCopy the code

If you use Ctrl+C to stop the service, you can see that the master=50462 process also processes the int signal and closes itself after shutting down the worker node

^C[2021-02-25 15:06:54 +0800] [50462] [INFO] Handling signal: int
[2021-02-25 15:06:54 +0800] [50464] [INFO] Worker exiting (pid: 50464)
[2021-02-25 15:06:54 +0800] [50465] [INFO] Worker exiting (pid: 50465)
[2021-02-25 15:06:54 +0800] [50466] [INFO] Worker exiting (pid: 50466)
[2021-02-25 15:06:54 +0800] [50467] [INFO] Worker exiting (pid: 50467)
[2021-02-25 15:06:54 +0800] [75918] [INFO] Worker exiting (pid: 75918)
[2021-02-25 15:06:54 +0800] [50462] [INFO] Shutting down: Master
Copy the code

If you are not familiar with the gunicon parameters, you can use the following command to view the help

# gunicorn -h
usage: gunicorn [OPTIONS] [APP_MODULE]

optional arguments:
  -h, --help            show this help message and exit
  ...
  -w INT, --workers INT
                        The number of worker processes for handling requests. [1]
Copy the code

Help with the familiar ArgParse implementation.

class Setting(object): def add_option(self, parser): args = tuple(self.cli) help_txt = "%s [%s]" % (self.short, self.default) help_txt = help_txt.replace("%", "%%") kwargs = { "dest": self.name, "action": self.action or "store", "type": self.type or str, "default": None, "help": help_txt } ... Parser. Add_argument (*args, **kwargs) # --workers option class name = "workers" section = "Worker Processes" cli = ["-w", "--workers"] meta = "INT" validator = validate_pos_int type = int default = int(os.environ.get("WEB_CONCURRENCY", 1)) desc = """\ The number of worker processes for handling requests. A positive integer generally in the ``2-4 x $(NUM_CORES)`` range. You'll want to vary this a bit to find the best for your particular application's work load. By default, the value of the ``WEB_CONCURRENCY`` environment variable. If it is not defined, the default is ``1``. """ def parser(self): kwargs = { "usage": self.usage, "prog": self.prog } parser = argparse.ArgumentParser(**kwargs) parser.add_argument("-v", "--version", action="version", default=argparse.SUPPRESS, version="%(prog)s (version " + __version__ + ")\n", help="show program's version number and exit") parser.add_argument("args", nargs="*", Help = argparse.suppress) keys = sorted(self.settings, key=self.settings.__getitem__) self.settings[k].add_option(parser) return parserCopy the code

Gunicorn – application implementation

Gunicorn’s Application is primarily implemented in the following three classes. It is important to note that application here can be understood as web-server application; Bottle /flask/ Django is the application of the Web-framework. The former dynamically loads the latter, the former handles HTTP services, and the latter handles a single HTTP request.

  • BaseApplication
    • Application
      • WSGIApplication

After sorting out the three applications, the code template is as follows:

class WSGIApplication(Application) def __init__(self, usage=None, prog=None): Self.do_load_config () def do_load_config():... CFG = self.init(Parser, args, args.args) def init(...) :... Self.app_uri = args[0] # def load(...) : util. Import_app (self.app_uri) def run(...) : self.load() Arbiter(self).run() # Arbiter def run() ""\ The ' 'gunicorn' 'command line Runner for launching gunicorn with generic WSGI applications. """ from gunicorn.app.wsgiapp import WSGIApplication WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run() if __name__ == '__main__': run()Copy the code

The implementation of the Application part is relatively simple and will not be repeated.

Arbiter implementation

Arbiter, the de facto core of the master process, has the following code template:

class Arbiter(object): def __init__(self, app): Self.worker_class = self.cfg.worker_class # worker_class self.num_workers = self.cfg.worker # worker_class... Def start(): self.init_signals() sock.create_socket(...) Def run(self): self.start() try: self.manage_workers() sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None if sig is None: Self.manage_workers () continue if sig not in self.sig_names: self.log.info("Ignoring unknown signal: Signame = self.sig_names. get(sig) handler = getattr(self, "handle_%s" % signame, None)... Handler () self.wakeup() # wakeup except (StopIteration, KeyboardInterrupt):...Copy the code

Before understanding Arbiter work to understand the signal, The Linux system can use the following command to view the signal list

# kill -l
 1) SIGHUP	 2) SIGINT	 3) SIGQUIT	 4) SIGILL	 5) SIGTRAP
 6) SIGABRT	 7) SIGBUS	 8) SIGFPE	 9) SIGKILL	10) SIGUSR1
 11) SIGSEGV	12) SIGUSR2	13) SIGPIPE	14) SIGALRM	15) SIGTERM
...
Copy the code
  • 1 (SIGHUP): terminate a connection, or reload the configuration for daemons
  • 2 (SIGINT): interrupt the session from the dialogue station
  • 3 (SIGQUIT): terminate the session from the dialogue station
  • 4 (SIGILL): illegal instruction was executed
  • 5 (SIGTRAP): do a single instruction (trap)
  • 6 (SIGABRT): abnormal termination
  • 7 (SIGBUS): error on the system bus
  • 8 (SIGFPE): floating point error
  • 9 (SIGKILL): immmediately terminate the process
  • 10 (SIGUSR1): user-defined signal
  • 11 (SIGSEGV): segmentation fault due to illegal access of a memory segment
  • 12 (SIGUSR2): user-defined signal
  • 13 (SIGPIPE): writing into a pipe, and nobody is reading from it
  • 14 (SIGALRM): the timer terminated (alarm)
  • 15 (SIGTERM): terminate the process in a soft way

Signals are events provided by the operating system that can be used to communicate across processes. Arbiter.init_signals does the following:

SIGNALS = [getattr(signal, "SIG%s" % x) for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()] def init_signals(self): ... # initialize all signals for s in self.SIGNALS: Signal (s, self.signal) signal.signal(self.sigchld, self.handle_chld) # def signal(self, sig, frame) if len(self.SIG_QUEUE) < 5: self.SIG_QUEUE.append(sig) self.wakeup()Copy the code

The capacity expansion signal TTIN demonstrated earlier is handled as follows:

def handle_ttin(self): """\ SIGTTIN handling. Increases the number of workers by one. """ self.num_workers += 1 # Expand self.manage_workers() # Management of the workerCopy the code

Arbiter’s Sleep and Warkeup are implemented like this:

Self.pipe = pair = os.pipe() # def sleep(self): """\ Sleep until PIPE is readable or we timeout. A readable PIPE means a signal occurred. """ try: Select (self.pipe [0], [], [], 1.0) if not ready[0]: Pass except (self.error, OSError) as e:... def wakeup(self): """\ Wake up the arbiter by writing to the PIPE """ try: Os.write (self.pipe [1], b'.')Copy the code

Arbiter creates the socket through sock. Create_sockets, binds the port and listener, and then passes the socket to the child process when the fork-worker.

Work = self.worker_class(self.worker_age, self.pid, self.LISTENERS, self.app, self.timeout / 2.0, self. CFG, self. self.log) self.cfg.pre_fork(self, worker) pid = os.fork() if pid ! = 0: worker.pid = pid # Record worker's pid self.WORKERS[pid] = worker # add to worker collection return pidCopy the code

Destroying worker is using signal:

def kill_workers(self, sig):
    """\
    Kill all workers with the signal `sig`
    :attr sig: `signal.SIG*` value
    """
    worker_pids = list(self.WORKERS.keys())
    for pid in worker_pids:
        os.kill(pid, sig)
Copy the code

Sync – worker

Next, let’s look at workers, mainly the implementation of sync-workers. The relationships between workers are as follows:

  • Worker processes signals
    • SyncWorker processes HTTP requests synchronously
    • ThreadWorker uses threads to process HTTP requests

Connect the fork-worker code in the previous Arbiter, and enter the init_process after the work is created

# Process Child
worker.pid = os.getpid()
try:
    util._setproctitle("worker [%s]" % self.proc_name)
    self.log.info("Booting worker with pid: %s", worker.pid)
    self.cfg.post_fork(self, worker)
    worker.init_process()
    sys.exit(0)
Copy the code

The init_process template for work looks like this:

def init_process(self): """\ If you override this method in a subclass, the last statement in the function should be to call this method with super().init_process() so that the ``run()`` loop """ # For waking ourselves up self.PIPE = os.pipe() # For waking ourselves up self. Self.wait_fds = self.sockets + [self.pipe [0]] # Self.init_signals () # initialize signal listener... Self.load_wsgi () # load wsGi application... # Enter main run loop self.booted = True self.run(Copy the code

Signal monitoring as work:

SIGNALS = [getattr(signal, "SIG%s" % x) for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()] def init_signals(self): # reset signaling for s in self.SIGNALS: signal.signal(s, signal.SIG_DFL) # init new signaling signal.signal(signal.SIGQUIT, self.handle_quit) signal.signal(signal.SIGTERM, self.handle_exit) signal.signal(signal.SIGINT, self.handle_quit) ... If hasattr(signal, 'set_wakeup_fd'): signal.set_wakeup_fd(self.pipe [1]Copy the code

Work’s most important run loop:

def run(self, timeout): listener = self.sockets[0] while self.alive: ... # Accept a connection. If we get an error telling us # that no connection is waiting we fall down to the # select which is where we'll wait for a bit for new # workers to come give us some love. try: Self.accept (listener) # Keep processing clients until no one is waiting. This # prevents the need to select() for every client that we # process. continue except EnvironmentError as e: ... Try: self.wait(timeout) # Wait except StopWaiting: returnCopy the code

Dealing with client connections, this section is similar to the previous section on HTTP and will not be covered here.

def accept(self, listener):
    client, addr = listener.accept()
    client.setblocking(1)
    util.close_on_exec(client)
    self.handle(listener, client, addr)
Copy the code

Work processes the completed request and goes into wait

def wait(self, timeout):
    try:
        ret = select.select(self.wait_fds, [], [], timeout)
        if ret[0]:
            if self.PIPE[0] in ret[0]:
                os.read(self.PIPE[0], 1)
            return ret[0]

    except select.error as e:
        if e.args[0] == errno.EINTR:
            return self.sockets
        if e.args[0] == errno.EBADF:
            if self.nr < 0:
                return self.sockets
            else:
                raise StopWaiting
        raise
Copy the code

summary

The following diagram shows the gunicorn workflow as a small conclusion

tip

You can use Thread to implement a timer

# reloader.py

class Reloader(threading.Thread):
    def __init__(self, extra_files=None, interval=1, callback=None):
        super().__init__()
        self.setDaemon(True)
        self._interval = interval
        self._callback = callback

    def run(self):
        mtimes = {}
        while True:
            for filename in self.get_files():
                try:
                    mtime = os.stat(filename).st_mtime
                except OSError:
                    continue
                old_time = mtimes.get(filename)
                if old_time is None:
                    mtimes[filename] = mtime
                    continue
                elif mtime > old_time:
                    if self._callback:
                        self._callback(filename)
            time.sleep(self._interval)
Copy the code

When using the gunicorn myapp:app command, myapp:app has no static import, but is loaded dynamically like this:

# util.py

klass = components.pop(-1)

mod = importlib.import_module('.'.join(components))

return getattr(mod, klass)
Copy the code

Refer to the link

  • Gunicorn Design docs.gunicorn.org/en/stable/d…
  • Reading gunicorn code document gunicorn. Readthedocs. IO/en/latest/I…
  • Handling Unix Signals in Python stackabuse.com/handling-un…
  • How To Use Signal Driven Programming In Applications medium.com/fintechexpl…
  • Django with Nginx, Gunicorn medium.com/analytics-v…