Hi, I’m Sean, read the source code and I’ll see you on Thursday.

Werkzeug is a comprehensive WSGI Web application library. It started as a simple collection of various WSGI utility tools and has become one of the most advanced WSGI utility libraries, the project behind Flask. Werkzeug is a German word meaning tool. This word is a little hard for me to pronounce (and probably one of the reasons it’s not so well known), but it just so happens that the official logo is a hammer, so I’ll just call it “The German Hammer” for short. For those of you interested in pronouncing Werkzeug correctly, check out the reference links at the bottom. This paper is divided into the following parts:

  • The profile
  • serving && wsgi
  • request && response
  • Local implementation

The profile

This code version is 2.0.0, and the main structure of the project is as follows:

file describe
serving Implementation of HTTP services and WSGI specifications
request && response Request and response processing
local Multithreaded partial implementation
middleware Middleware part implementation
routing && urls Routing and URL processing
datastuctures The data structure

The “German Hammer” project is very important, so I will try to read the project thoroughly by using general and slow reading methods. The paper is divided into two parts. This is the first part, which introduces the first three parts.

Before we begin, let’s review HTTP services and WSGi-Application.

A brief review of HTTP services:

# HTTP/server. Py def test (HandlerClass = SimpleHTTPRequestHandler, ServerClass = HTTPServer, protocol = "HTTP / 1.0", port=8000, bind=""): server_address = (bind, port) HandlerClass.protocol_version = protocol with ServerClass(server_address, HandlerClass) as httpd: sa = httpd.socket.getsockname() serve_message = "Serving HTTP on {host} port {port} (http://{host}:{port}/) ..." print(serve_message.format(host=sa[0], port=sa[1])) try: httpd.serve_forever() except KeyboardInterrupt: print("\nKeyboard interrupt received, exiting.") sys.exit(0) # self.rfile.readline(65537) # self.wfile.write(body)Copy the code
  • HTTPServer is responsible for implementing HTTP services.
  • SimpleHTTPRequestHandler handles HTTP requests.
  • The request and response are on the IO rfile and Wfile.

Wsgi-application review:

# wsgiref/simple_server.py def demo_app(environ,start_response): from io import StringIO stdout = StringIO() print("Hello world!" , file=stdout) print(file=stdout) h = sorted(environ.items()) for k,v in h: print(k,'=',repr(v), file=stdout) start_response("200 OK", [('Content-Type','text/plain;  charset=utf-8')]) return [stdout.getvalue().encode("utf-8")] def make_server( host, port, app, server_class=WSGIServer, handler_class=WSGIRequestHandler ): """Create a new WSGI server listening on `host` and `port` for `app`""" server = server_class((host, port), handler_class) server.set_app(app) return server if __name__ == '__main__': with make_server('', 8000, demo_app) as httpd: sa = httpd.socket.getsockname() print("Serving HTTP on", sa[0], "port", sa[1], "..." ) import webbrowser webbrowser.open('http://localhost:8000/xyz? abc') httpd.handle_request() # serve one request, then exitCopy the code
  • WSGIServer implements HTTP services that conform to the WSGI specification
  • WSGIRequestHandler implements wsGI requests
  • Wsgi-application is responsible for implementing WSGI applications
  • The application gets the request data from environ, processes the HTTP response header using the start_Response callback function, and returns the request data using the return value

serving

The Serving module provides the service entry and uses argparse to handle the command-line tools:

def main() -> None:
    """A simple command-line interface for :py:func:`run_simple`."""
    import argparse
    ...
    run_simple(
        hostname=hostname or "127.0.0.1",
        port=int(port or 5000),
        application=import_string(args.application),
        use_reloader=args.reload,
        use_debugger=args.debug,
    )
Copy the code
  • The server IP address and port are the main parameters. The application can be the external module name, which is automatically loaded.

Responsible for creating the service:

def make_server(
    host: str,
    port: int,
    app: "WSGIApplication",
    threaded: bool = False,
    processes: int = 1,
    request_handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
    passthrough_errors: bool = False,
    ssl_context: t.Optional[_TSSLContextArg] = None,
    fd: t.Optional[int] = None,
) -> BaseWSGIServer:
    if threaded:
        return ThreadedWSGIServer(
            host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
        )
    elif processes > 1:
        return ForkingWSGIServer(
            host, port, app, processes, request_handler, passthrough_errors, ssl_context,fd=fd,
        )
    else:
        return BaseWSGIServer(
            host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
        )
Copy the code
  • Depending on the parameters, you can create multithreaded, multiprocess, or normal services

Multi-threaded and multi-process services are combined using mixins:

class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer):
    multithread = True
    daemon_threads = True

class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer):
    multiprocess = True
Copy the code

The basic implementation of WSGIServer:

class BaseWSGIServer(HTTPServer):
    request_queue_size = LISTEN_QUEUE
    def __init__(
        self,
        host: str,
        port: int,
        app: "WSGIApplication",
        handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
        passthrough_errors: bool = False,
        ssl_context: t.Optional[_TSSLContextArg] = None,
        fd: t.Optional[int] = None,
    ) -> None:
        ...
Copy the code
  • Note that BaseWSGIServer is inherited fromHTTPServerThe WSGIREF module is not used

The main implementation is in WSGIRequestHandler handling requests:

class WSGIRequestHandler(BaseHTTPRequestHandler):
    """A request handler that implements WSGI dispatching."""
    
    def handle_one_request(self) -> None:
        """Handle a single HTTP request."""
        self.raw_requestline = self.rfile.readline()
        ...
        self.parse_request():
            self.run_wsgi()
Copy the code

Each request executes the corresponding WSGI implementation:

def run_wsgi(self) -> None:
    self.environ = environ = self.make_environ()
    status_set: t.Optional[str] = None
    headers_set: t.Optional[t.List[t.Tuple[str, str]]] = None
    
    def write(data: bytes) -> None:
        self.wfile.write(data)
        self.wfile.flush()
        
    def start_response(status, headers, exc_info=None):  # type: ignore
        nonlocal status_set, headers_set
        ...
        status_set = status
        headers_set = headers
        return write

    def execute(app: "WSGIApplication") -> None:
        application_iter = app(environ, start_response)
        try:
            for data in application_iter:
                write(data)
            if not headers_sent:
                write(b"")
        finally:
            if hasattr(application_iter, "close"):
                application_iter.close()  # type: ignore
    
    execute(self.server.app)
Copy the code
  • Generates wsGI environ
  • Generate the start_response callback method
  • Execute app, pass in env and start_response callbacks, then iterate over the results and write to wfile

Make_environ converts the requested data read to env:

 def make_environ(self) -> "WSGIEnvironment":
        environ: "WSGIEnvironment" = {
            "wsgi.version": (1, 0),
            "wsgi.url_scheme": url_scheme,
            "wsgi.input": self.rfile,
            "wsgi.errors": sys.stderr,
            "wsgi.multithread": self.server.multithread,
            "wsgi.multiprocess": self.server.multiprocess,
            "wsgi.run_once": False,
            "werkzeug.server.shutdown": shutdown_server,
            "werkzeug.socket": self.connection,
            "SERVER_SOFTWARE": self.server_version,
            "REQUEST_METHOD": self.command,
            "SCRIPT_NAME": "",
            "PATH_INFO": _wsgi_encoding_dance(path_info),
            "QUERY_STRING": _wsgi_encoding_dance(request_url.query),
            # Non-standard, added by mod_wsgi, uWSGI
            "REQUEST_URI": _wsgi_encoding_dance(self.path),
            # Non-standard, added by gunicorn
            "RAW_URI": _wsgi_encoding_dance(self.path),
            "REMOTE_ADDR": self.address_string(),
            "REMOTE_PORT": self.port_integer(),
            "SERVER_NAME": self.server.server_address[0],
            "SERVER_PORT": str(self.server.server_address[1]),
            "SERVER_PROTOCOL": self.request_version,
        }
        return environ
Copy the code

request && response

Request && Response is implemented in two layers. The bottom layer is a pure logical structure in sansio package. The upper layer contains the wsGI implementation in the Wrappers package.

sansio.Request && sansio-Response

Sansio. Request constructor. This class has important comments. I posted the original:

class Request:
    """Represents the non-IO parts of a HTTP request, including the
    method, URL info, and headers.

    This class is not meant for general use. It should only be used when
    implementing WSGI, ASGI, or another HTTP application spec. Werkzeug
    provides a WSGI implementation at :cls:`werkzeug.wrappers.Request`.
    """
    def __init__(
        self,
        method: str,
        scheme: str,
        server: t.Optional[t.Tuple[str, t.Optional[int]]],
        root_path: str,
        path: str,
        query_string: bytes,
        headers: Headers,
        remote_addr: t.Optional[str],
    ) -> None:
        ...
Copy the code

Sansio. Request is an HTTP Request implementation of the non-IO concept, which expects IO and logic to be sandwiched between in-IO/ business logic/out-of-IO layers. The Request object implemented in this way is abstract, does not involve IO and AIO concrete implementation, more general, and can be quickly tested. If wsgi implementation, it is recommended to use the upper werkzeug. The wrappers. The Request.

For those interested in non-IO, see the reference link.

Sansio. Request is implemented in a simpler data model, with familiar values and assignments for fields. More distinctive are the three implementations. The cached_property decorator wraps the property first:

@cached_property def full_path(self) -> str: """Requested path, including the query string.""" return f"{self.path}? {_to_str(self.query_string, self.url_charset)}"Copy the code

Combining the decorator name with the function implementation, you can see that this property is cached after only one evaluation to improve performance. Then there are the properties defined by the header_property method:

content_type = header_property[str](
    "Content-Type",
    doc="""The Content-Type entity-header field indicates the media
    type of the entity-body sent to the recipient or, in the case of
    the HEAD method, the media type that would have been sent had
    the request been a GET.""",
    read_only=True,
)
Copy the code

Get (“old”, type=int) to retrieve HTTP request parameters:

parameter_storage_class: t.Type[MultiDict] = ImmutableMultiDict @cached_property def args(self) -> "MultiDict[str, str]": """The parsed URL parameters (the part in the URL after the question mark). By default an :class:`~werkzeug.datastructures.ImmutableMultiDict` is returned from this function. This can be changed by setting :attr:`parameter_storage_class` to a different type. This might be necessary if the order of the form data is important.  """ return url_decode( self.query_string, self.url_charset, errors=self.encoding_errors, cls=self.parameter_storage_class, )Copy the code

Request data needs to be immutable and relies heavily on the data structure of ImmutableMultiDict, which is implemented in more detail in the next article.

Sansio-response is similar to sansio-Response:

class Response:
    def __init__(
        self,
        status: t.Optional[t.Union[int, str, HTTPStatus]] = None,
        headers: t.Optional[
            t.Union[
                t.Mapping[str, t.Union[str, int, t.Iterable[t.Union[str, int]]]],
                t.Iterable[t.Tuple[str, t.Union[str, int]]],
            ]
        ] = None,
        mimetype: t.Optional[str] = None,
        content_type: t.Optional[str] = None,
    ) -> None:
        ...
    
    @property
    def status_code(self) -> int:
        """The HTTP status code as a number."""
        return self._status_code

    @status_code.setter
    def status_code(self, code: int) -> None:
        self.status = code  # type: ignore
Copy the code

wrappers.Request

Wrappers Request and Response are a bit more complicated, so let’s read them separately. Wrappers.Request:

class Request(_SansIORequest): """Represents an incoming WSGI HTTP request, with headers and body taken from the WSGI environment. Has properties and methods for using the functionality defined by  various HTTP specs. The data in requests object is read-only. """Copy the code

Request inherits from sansio.Request, and the comment details its functions and features (read-only).

The constructor can be seen as built using env and includes the familiar properties of method, scheme, query_sring and so on in HTTP requests:

def __init__(
        self,
        environ: "WSGIEnvironment",
        populate_request: bool = True,
        shallow: bool = False,
    ) -> None:
        super().__init__(
            method=environ.get("REQUEST_METHOD", "GET"),
            scheme=environ.get("wsgi.url_scheme", "http"),
            server=_get_server(environ),
            root_path=_wsgi_decoding_dance(
                environ.get("SCRIPT_NAME") or "", self.charset, self.encoding_errors
            ),
            path=_wsgi_decoding_dance(
                environ.get("PATH_INFO") or "", self.charset, self.encoding_errors
            ),
            query_string=environ.get("QUERY_STRING", "").encode("latin1"),
            headers=EnvironHeaders(environ),
            remote_addr=environ.get("REMOTE_ADDR"),
        )
        self.environ = environ
        ...
Copy the code

While pure Query is simple, let’s take a look at the more complex form implementation. The form part of the business API looks something like this:

def on_new_url(self, request):
    error = None
    url = ""
    if request.method == "POST":
        url = request.form["url"]
        ...
Copy the code

Wrappers.Request’s form is also cached_property, which improves efficiency, while the form is parsed using FormDataParser:

form_data_parser_class: t.Type[FormDataParser] = FormDataParser

@cached_property
def form(self) -> "ImmutableMultiDict[str, str]":
    self._load_form_data()
    return self.form  # type: ignore

def _load_form_data(self) -> None:
    ...

    parser = self.form_data_parser_class(
            self._get_file_stream,
            self.charset,
            self.encoding_errors,
            self.max_form_memory_size,
            self.max_content_length,
            self.parameter_storage_class,
        )
    ...
    data = parser.parse(
        self._get_stream_for_parsing(),
        self.mimetype,
        self.content_length,
        self.mimetype_params,
    )

    d = self.__dict__
    d["stream"], d["form"], d["files"] = data
Copy the code

Here is a rough implementation of FormDataParser:

# formparser.py class FormDataParser def parse_from_environ(self, environ: "WSGIEnvironment") -> "t_parse_result": """Parses the information from the environment as form data. :param environ: the WSGI environment to be used for parsing. :return: A tuple in the form ``(stream, form, files)``. """ content_type = environ.get("CONTENT_TYPE", "") content_length = get_content_length(environ) mimetype, options = parse_options_header(content_type) return self.parse(get_input_stream(environ), mimetype, content_length, The options)Copy the code

wrappers.Response

Wrappers.Response is similar to wrappers.Request, inherited from sansio.Response:

class Response(_SansIOResponse):
    """Represents an outgoing WSGI HTTP response with body, status, and
    headers. Has properties and methods for using the functionality
    defined by various HTTP specs.
    ...
    The response object is itself a WSGI application callable. When
    called (:meth:`__call__`) with ``environ`` and ``start_response``,
    it will pass its status and headers to ``start_response`` then
    return its body as an iterable"""
Copy the code

The wrappers.Response comment also highlights the use of Response. Let’s look at the following example:

from werkzeug.wrappers.response import Response def index(): return Response("Hello, World!" ) def application(environ, start_response): path = environ.get("PATH_INFO") or "/" if path == "/": response = index() else: response = Response("Not Found", status=404) return response(environ, start_response)Copy the code

As you can see in the example, a Response object is generated for each request, and the call method of this object is executed and returned using the environ and start_Response parameters.

Constructor of wrappers.Response:

def __init__(
        self,
        response: t.Optional[
            t.Union[t.Iterable[bytes], bytes, t.Iterable[str], str]
        ] = None,
        status: t.Optional[t.Union[int, str, HTTPStatus]] = None,
        headers: t.Optional[
            t.Union[
                t.Mapping[str, t.Union[str, int, t.Iterable[t.Union[str, int]]]],
                t.Iterable[t.Tuple[str, t.Union[str, int]]],
            ]
        ] = None,
        mimetype: t.Optional[str] = None,
        content_type: t.Optional[str] = None,
        direct_passthrough: bool = False,
    ) -> None:
        super().__init__(
            status=status,
            headers=headers,
            mimetype=mimetype,
            content_type=content_type,
        )
        ...
        if response is None:
            self.response = []
        elif isinstance(response, (str, bytes, bytearray)):
            self.set_data(response)
        else:
            self.response = response
Copy the code

Key call methods and related handlers:

def __call__(
    self, environ: "WSGIEnvironment", start_response: "StartResponse"
) -> t.Iterable[bytes]:
    """Process this response as WSGI application.

    :param environ: the WSGI environment.
    :param start_response: the response callable provided by the WSGI
                           server.
    :return: an application iterator
    """
    app_iter, status, headers = self.get_wsgi_response(environ)
    start_response(status, headers)
    return app_iter

def get_app_iter(self, environ: "WSGIEnvironment") -> t.Iterable[bytes]:
    status = self.status_code
    if (
        environ["REQUEST_METHOD"] == "HEAD"
        or 100 <= status < 200
        or status in (204, 304)
    ):
        iterable: t.Iterable[bytes] = ()
    elif self.direct_passthrough:
        return self.response  # type: ignore
    else:
        iterable = self.iter_encoded()
    return ClosingIterator(iterable, self.close)
        
def get_wsgi_response(
    self, environ: "WSGIEnvironment"
) -> t.Tuple[t.Iterable[bytes], str, t.List[t.Tuple[str, str]]]:
    headers = self.get_wsgi_headers(environ)
    app_iter = self.get_app_iter(environ)
    return app_iter, self.status, headers.to_wsgi_list()
Copy the code

Basically, wsGI’s Response is converted into status, header, and result iterators for WSGi-Server.


Local implementation

Local is a very important module of the German Hammer. Take a look at an example of the standard threading. Local implementation:

import threading
import logging
import random

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-0s) %(message)s',)

def show(d):
    try:
        val = d.val
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug('value=%s', val)

def f(d):
    show(d)
    d.val = random.randint(1, 100)
    show(d)

if __name__ == '__main__':
    d = threading.local()
    show(d)
    d.val = 999
    show(d)

    for i in range(2):
        t = threading.Thread(target=f, args=(d,))
        t.start()
Copy the code

The value of the same variable d is different for different threads:

(MainThread) No value yet
(MainThread) value=999
(Thread-1) No value yet
(Thread-1) value=56
(Thread-2) No value yet
(Thread-2) value=38
Copy the code

Threading. Local addresses two main problems:

  • Data isolation between threads
  • The code is simple to write, and you only need to define a variable

In fact, to achieve thread isolation, you can use a dictionary, such as the value of each thread plus the thread ID as the key to distinguish. German Hammer local uses this idea. Also to support the implementation of greenlets, or coroutines, a new local was implemented instead of using threading. Local directly.

try:
    from greenlet import getcurrent as _get_ident
except ImportError:
    from threading import get_ident as _get_ident
Copy the code

The official documentation for threading. Get_ident is as follows:

threading.get_ident()

Returns the Thread identifier for the current thread. It’s a non-zero integer. Its value has no immediate meaning and is primarily used as a Magic cookie, such as an index to a dictionary containing thread-related data. Thread identifiers may be reused when a thread exits and a new thread is created.

New in version 3.3.

The base of local is the ContextVar class:

class ContextVar: # type: Ignore """A fake ContextVar based on the previous greenlet/threading ident function. and old versions of gevent. """ def __init__(self, _name: str) -> None: self.storage: t.Dict[int, t.Dict[str, t.Any]] = {} def get(self, default: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: return self.storage.get(_get_ident(), default) def set(self, value: t.Dict[str, t.Any]) -> None: self.storage[_get_ident()] = valueCopy the code
  • ContextVar defines a dictionary of secondary structures where a key is the thread/coroutine identifier, thus allowing thread/coroutine data isolation.

The Local implementation is primarily a _storage property using the ContextVar object:

class Local:
    __slots__ = ("_storage",)

    def __init__(self) -> None:
        object.__setattr__(self, "_storage", ContextVar("local_storage"))
    
    def __getattr__(self, name: str) -> t.Any:
        values = self._storage.get({})
        try:
            return values[name]
        except KeyError:
            raise AttributeError(name)

    def __setattr__(self, name: str, value: t.Any) -> None:
        values = self._storage.get({}).copy()
        values[name] = value
        self._storage.set(values)
Copy the code
  • Note that each time the value is set, it is copied and then modified (why? Welcome interactive discussion).

A simple stack is implemented using Local:

class LocalStack
    def __init__(self) -> None:
        self._local = Local()
    
    def push(self, obj: t.Any) -> t.List[t.Any]:
        """Pushes a new item to the stack"""
        rv = getattr(self._local, "stack", []).copy()
        rv.append(obj)
        self._local.stack = rv
        return rv  # type: ignore

    def pop(self) -> t.Any:
        """Removes the topmost item from the stack, will return the
        old value or `None` if the stack was already empty.
        """
        stack = getattr(self._local, "stack", None)
        if stack is None:
            return None
        elif len(stack) == 1:
            release_local(self._local)
            return stack[-1]
        else:
            return stack.pop()
Copy the code

LocalManager is used to manage all local data.

class LocalManager:
    """Local objects cannot manage themselves. For that you need a local
    manager. You can pass a local manager multiple locals or add them
    later y appending them to `manager.locals`. Every time the manager
    cleans up, it will clean up all the data left in the locals for this
    context
    """
    
    def __init__(
        self,
        locals: t.Optional[t.Iterable[t.Union[Local, LocalStack]]] = None,
        ident_func: None = None,
    ) -> None:
        if locals is None:
            self.locals = []
        elif isinstance(locals, Local):
            self.locals = [locals]
        else:
            self.locals = list(locals)
        ...
    
    def cleanup(self) -> None:
        """Manually clean up the data in the locals for this context.  Call
        this at the end of the request or use `make_middleware()`.
        """
        for local in self.locals:
            release_local(local)
Copy the code

How to use local? Using singletons, here is an example from Flask:

# flask-globals
# context locals
_request_ctx_stack = LocalStack()
_app_ctx_stack = LocalStack()
Copy the code

The _request_CTX_STACK is a thread-safe global variable that can be read anywhere in a business process without passing data around.

Refer to the link

  • Werkzeug document werkzeug.palletsprojects.com/en/2.0.x/
  • Python technical term pronunciation guide zhuanlan.zhihu.com/p/320457692 (PyCon China 2020 speech)
  • Sans I/O programming (PyCon UK talk) alexwlchan.net/2019/10/san…