Start the

checkRuning
local pid_path = env.apisix_home .. "/logs/nginx.pid" local pid = util.read_file(pid_path) pid = tonumber(pid) if pid then local lsof_cmd = "lsof -p " .. pid local res, err = util.execute_cmd(lsof_cmd) if not (res and res == "") then if not res then print(err) else print("APISIX is running..." ) end return end print("nginx.pid exists but there's no corresponding process with pid ", pid, ", the file will be overwritten") endCopy the code

If Nginx is started, run the lsof command to check whether the Nginx process number exists. If the process number exists, run the lsof command to check whether the Nginx process number exists

    init_etcd(env, args)
    util.execute_cmd(env.openresty_args)
Copy the code

Nginx configuration file generation, ETCD initialization, and OpenResty startup command execution are followed:

Initialize Apisix
local function init(env)
    if env.is_root_path then
        print('Warning! Running apisix under /root is only suitable for '
              .. 'development environments and it is dangerous to do so. '
              .. 'It is recommended to run APISIX in a directory '
              .. 'other than /root.')
    end

    -- read_yaml_conf
    local yaml_conf, err = file.read_yaml_conf(env.apisix_home)
    if not yaml_conf then
        util.die("failed to read local yaml config of apisix: ", err, "\n")
    end
         ...
end     
Copy the code

First check whether the Apisix installation directory is the root path, then call read_YAMl_conf (env.apisix_HOME), pass the installation path for the configuration file address splicing, read the configuration file and hand it to TinyyAML parsing (parsing to table type).

local use_openresty_1_17 = false
    if not version_greater_equal(or_ver, "1.19.3") then
        use_openresty_1_17 = true
    end

    local or_info = util.execute_cmd("openresty -V 2>&1")
    local with_module_status = true
    if or_info and not or_info:find("http_stub_status_module", 1, true) then
        stderr:write("'http_stub_status_module' module is missing in ",
                     "your openresty, please check it out. Without this ",
                     "module, there will be fewer monitoring indicators.\n")
        with_module_status = false
    end

    local use_apisix_openresty = true
    if or_info and not or_info:find("apisix-nginx-module", 1, true) then
        use_apisix_openresty = false
    end

    local enabled_plugins = {}
    for i, name in ipairs(yaml_conf.plugins) do
        enabled_plugins[name] = true
    end
 ....
 
   local sys_conf = {
        use_openresty_1_17 = use_openresty_1_17,
        lua_path = env.pkg_path_org,
        lua_cpath = env.pkg_cpath_org,
        os_name = util.trim(util.execute_cmd("uname")),
        apisix_lua_home = env.apisix_home,
        with_module_status = with_module_status,
        use_apisix_openresty = use_apisix_openresty,
        error_log = {level = "warn"},
        enabled_plugins = enabled_plugins,
        dubbo_upstream_multiplex_count = dubbo_upstream_multiplex_count,
        tcp_enable_ssl = tcp_enable_ssl,
    }
    
  ....  
Copy the code

Then prepare the system context, that is, a series of parameter verification and transformation, obtain the information related to the system runtime environment, and assign value to sys_conf, verification.

local conf_render = template.compile(ngx_tpl)
    local ngxconf = conf_render(sys_conf)

    local ok, err = util.write_file(env.apisix_home .. "/conf/nginx.conf",
Copy the code

Finally, according to ngx_tPL (Nginx template) and sys_conf, lua-resty-template is used to render the template. The sys_conf parameter values are passed into ngxconf to generate the nginx.conf file.

Initialize the etcd
function _M.init(env, args) ... local host_count = #(yaml_conf.etcd.host) local scheme for i = 1, host_count do local host = yaml_conf.etcd.host[i] local fields = util.split(host, "://") if not fields then util.die("malformed etcd endpoint: ", host, "\n") end if not scheme then scheme = fields[1] elseif scheme ~= fields[1] then print([[WARNING: mixed protocols among etcd endpoints]]) end end -- check the etcd cluster version for index, host in ipairs(yaml_conf.etcd.host) do local version_url = host .. "/version" local errmsg local res, err local retry_time = 0 while retry_time < 2 do res, err = request(version_url, yaml_conf) -- In case of failure, request returns nil followed by an error message. -- Else the first return value is the response body -- and followed by  the response status code. if res then break end retry_time = retry_time + 1 print(str_format("Warning! Request etcd endpoint \'%s\' error, %s, retry time=%s", version_url, err, retry_time)) end if not res then errmsg = str_format("request etcd endpoint \'%s\' error, %s\n", version_url, err) util.die(errmsg) end local body, _, err = dkjson.decode(res) if err or (body and not body["etcdcluster"]) then errmsg = str_format("got malformed version message: \"%s\" from etcd \"%s\"\n", res, version_url) util.die(errmsg) end local cluster_version = body["etcdcluster"] if compare_semantic_version(cluster_version, env.min_etcd_version) then util.die("etcd cluster version ", cluster_version, " is less than the required version ", env.min_etcd_version, ", please upgrade your etcd cluster\n") end endCopy the code

Etcd starts by reading the configuration file and verifying it, resolving the address set of the ETCD cluster, obtaining the version of the ETCD cluster in two retries, and verifying the version by compare_semantic_version.

for index, host in ipairs(yaml_conf.etcd.host) do local is_success = true local errmsg local auth_token local user = yaml_conf.etcd.user local password = yaml_conf.etcd.password if user and password then local auth_url = host .. "/v3/auth/authenticate" local json_auth = { name = etcd_conf.user, password = etcd_conf.password } local post_json_auth = dkjson.encode(json_auth) local response_body = {} local res, err local retry_time = 0 while retry_time < 2 do res, err = request({ url = auth_url, method = "POST", source = ltn12.source.string(post_json_auth), sink = ltn12.sink.table(response_body), headers = { ["Content-Length"] = #post_json_auth } }, yaml_conf) -- In case of failure, request returns nil followed by an error message. -- Else the first return value is just the number 1 -- and followed by  the response status code. if res then break end retry_time = retry_time + 1 print(str_format("Warning! Request etcd endpoint \'%s\' error, %s, retry time=%s", auth_url, err, retry_time)) end if not res then errmsg = str_format("request etcd endpoint \"%s\" error, %s\n", auth_url, err) util.die(errmsg) end local res_auth = table_concat(response_body) local body_auth, _, err_auth = dkjson.decode(res_auth) if err_auth or (body_auth and not body_auth["token"]) then errmsg = str_format("got malformed auth message: \"%s\" from etcd \"%s\"\n", res_auth, auth_url) util.die(errmsg) end auth_token = body_auth.token endCopy the code

Then call /v3/ Auth/Authenticate to obtain the ETCD token. Subsequent operations require Authorization in the request header for identity verification.

local dirs = {}
        for name in pairs(constants.HTTP_ETCD_DIRECTORY) do
            dirs[name] = true
        end
        for name in pairs(constants.STREAM_ETCD_DIRECTORY) do
            dirs[name] = true
        end

        for dir_name in pairs(dirs) do
            local key =  (etcd_conf.prefix or "") .. dir_name .. "/"

            local put_url = host .. "/v3/kv/put"
            local post_json = '{"value":"' .. base64_encode("init_dir")
                              .. '", "key":"' .. base64_encode(key) .. '"}'
            local response_body = {}
            local headers = {["Content-Length"] = #post_json}
            if auth_token then
                headers["Authorization"] = auth_token
            end

            local res, err
            local retry_time = 0
            while retry_time < 2 do
                res, err = request({
                    url = put_url,
                    method = "POST",
                    source = ltn12.source.string(post_json),
                    sink = ltn12.sink.table(response_body),
                    headers = headers
                }, yaml_conf)
                retry_time = retry_time + 1
                if res then
                    break
                end
                print(str_format("Warning! Request etcd endpoint \'%s\' error, %s, retry time=%s",
                                 put_url, err, retry_time))
            end

            if not res then
                errmsg = str_format("request etcd endpoint \"%s\" error, %s\n", put_url, err)
                util.die(errmsg)
            end

            local res_put = table_concat(response_body)
            if res_put:find("404 page not found", 1, true) then
                errmsg = str_format("gRPC gateway is not enabled in etcd cluster \"%s\",",
                                    "which is required by Apache APISIX\n")
                util.die(errmsg)
            end

            if res_put:find("error", 1, true) then
                is_success = false
                if (index == host_count) then
                    errmsg = str_format("got malformed key-put message: \"%s\" from etcd \"%s\"\n",
                                        res_put, put_url)
                    util.die(errmsg)
                end

                break
            end

            if args and args["verbose"] then
                print(res_put)
            end
        end

        if is_success then
            etcd_ok = true
            break
        end
    end

    if not etcd_ok then
        util.die("none of the configured etcd works well")
    end

Copy the code

Local post_json = ‘{“value”:”‘.. base64_encode(“init_dir”) .. ‘”, “key”:”‘ .. base64_encode(key) .. After ‘”}’ is created successfully, the directory structure generated in etcd is:

@11d2d7374c12:/$ etcdctl get --prefix "" /apisix/consumers/ init_dir /apisix/data_plane/server_info/039a6657-8d21-4b3b-b364-32c1ad92a6a6 {" id ":" 039 a6657-8 b3b d21-4-32 c1ad92a6a6 b364 - ", "etcd_version" : "3.4.0", "version" : "2.5", "hostname" : "4364 cdfb7e75", "up_time" : 13941,"boot_time":1625041981,"last_report_time":1625055922} /apisix/global_rules/ init_dir /apisix/plugin_configs/ init_dir /apisix/plugin_metadata/ init_dir /apisix/plugins [{"name":"api-breaker"},{"name":"authz-keycloak"},{"name":"basic-auth"},{"name":"batch-requests"},{"name":"consumer-rest riction"},{"name":"cors"},{"name":"echo"},{"name":"fault-injection"},{"name":"grpc-transcode"},{"name":"hmac-auth"},{"na me":"http-logger"},{"name":"ip-restriction"},{"name":"jwt-auth"},{"name":"kafka-logger"},{"name":"key-auth"},{"name":"li mit-conn"},{"name":"limit-count"},{"name":"limit-req"},{"name":"openid-connect"},{"name":"prometheus"},{"name":"proxy-ca che"},{"name":"proxy-mirror"},{"name":"proxy-rewrite"},{"name":"redirect"},{"name":"referer-restriction"},{"name":"reque st-id"},{"name":"request-validation"},{"name":"response-rewrite"},{"name":"serverless-post-function"},{"name":"serverles s-pre-function"},{"name":"sls-logger"},{"name":"syslog"},{"name":"tcp-logger"},{"name":"udp-logger"},{"name":"uri-blocke r"},{"name":"wolf-rbac"},{"name":"zipkin"},{"name":"server-info"},{"name":"traffic-split"},{"name":"mqtt-proxy","stream" :true}] /apisix/plugins/ init_dir /apisix/proto/ init_dir /apisix/routes/ init_dir /apisix/services/ init_dir /apisix/ssl/ init_dir /apisix/stream_routes/ init_dir /apisix/upstreams/ init_dirCopy the code
Start the Openresty

Apisix is an application built on OpenResty. The last step is to start OpenResty util.execute_cmd(env.openresty_args). local openresty_args = [[openresty -p ]] .. apisix_home .. [[ -c ]] .. apisix_home .. [[/conf/nginx.conf]]

The main process is initialized

By looking at the previously generated Nginx configuration file:

Init_by_lua_block {require "resty.core" apisix = require("apisix") local dns_resolver = {"127.0.0.11", } local args = { dns_resolver = dns_resolver, } apisix.http_init(args) }Copy the code

You can see that the main process initializes when apisix.http_init is called:

function _M.http_init(args)
    require("resty.core")

    if require("ffi").os == "Linux" then
        require("ngx.re").opt("jit_stack_size", 200 * 1024)
    end

    require("jit.opt").start("minstitch=2", "maxtrace=4000",
                             "maxrecord=8000", "sizemcode=64",
                             "maxmcode=4000", "maxirconst=1000")

    core.resolver.init_resolver(args)
    core.id.init()

    local process = require("ngx.process")
    local ok, err = process.enable_privileged_agent()
    if not ok then
        core.log.error("failed to enable privileged_agent: ", err)
    end

    if core.config.init then
        local ok, err = core.config.init()
        if not ok then
            core.log.error("failed to load the configuration: ", err)
        end
    end
end

Copy the code

This method is followed by jit parameter setting and startup, APisix UID generation, Agent process startup, and core.config.init().

Core.config.init()

Let’s first look at the definition of config in core.lua:

local config_center = local_conf.apisix and local_conf.apisix.config_center
                      or "etcd"
log.info("use config_center: ", config_center)
local config = require("apisix.core.config_" .. config_center)
config.type = config_center
Copy the code

By default, config_center configured in the local configuration is used. If it is not configured, etcd is used, i.e. Config = config_etcd.lua

Now look at its init operation:

    local local_conf, err = config_local.local_conf()
    if not local_conf then
        return nil, err
    end

    if table.try_read_attr(local_conf, "apisix", "disable_sync_configuration_during_start") then
        return true
    end

    local etcd_cli, err = get_etcd()
    if not etcd_cli then
        return nil, "failed to start a etcd instance: " .. err
    end

    local etcd_conf = local_conf.etcd
    local prefix = etcd_conf.prefix
    local res, err = readdir(etcd_cli, prefix, create_formatter(prefix))
    if not res then
        return nil, err
    end

    return true
end
Copy the code

*local res, err = readdir(etcd_cli, prefix, create_formatter(prefix))*

Create_formatter (prefix) generates the formatter based on the local ETCD configured prefix (default: “/apisix”).

Readdir reads directory contents from etCD and formats them with formatter.

local function readdir(etcd_cli, key, formatter)
    if not etcd_cli then
        return nil, "not inited"
    end

    local res, err = etcd_cli:readdir(key)
    if not res then
        -- log.error("failed to get key from etcd: ", err)
        return nil, err
    end

    if type(res.body) ~= "table" then
        return nil, "failed to read etcd dir"
    end

    res, err = etcd_apisix.get_format(res, key .. '/', true, formatter)
    if not res then
        return nil, err
    end

    return res
end
Copy the code

The readdir function of the etcd client is called first, and the contents of the specified key (in this case, the key is /apisix, that is, the root directory) are pulled from the etCD, and then the etcd_apisix.get_format(res, key.. ‘/’, true, formatter) to further manipulate the data according to the formatter passed in:

function _M.get_format(res, real_key, is_dir, formatter)
...

    if formatter then
        return formatter(res)
    end

...
end
Copy the code

Formatter iterates data from etCD into config_etCD defined loadeD_configuration for subsequent use.

The worker process is initialized

Copy the code

Apisix.http_init_worker () is responsible for the initialization of the worker process by calling the init_work() function of each component. Let’s focus on the following steps:

function _M.http_init_worker()
...
    local discovery = require("apisix.discovery.init").discovery
    if discovery and discovery.init_worker then
        discovery.init_worker()
    end
...    
    plugin.init_worker()
    router.http_init_worker()
    require("apisix.http.service").init_worker()
    plugin_config.init_worker()
    require("apisix.consumer").init_worker()

    if core.config == require("apisix.core.config_yaml") then
        core.config.init_worker()
    end
...
end
Copy the code
Service discovery initialization

The first is initialization of service discovery (if configured) :

if discovery_type then
    for discovery_name, _ in pairs(discovery_type) do
        log.info("use discovery: ", discovery_name)
        discovery[discovery_name] = require("apisix.discovery." .. discovery_name)
    end
end

function discovery.init_worker()
    if discovery_type then
        for discovery_name, _ in pairs(discovery_type) do
            discovery[discovery_name].init_worker()
        end
    end
end
Copy the code

In discovery.init.lua, all the service discovery components configured locally call their init_worker() in turn to initialize them. Take Eureka for example:

function _M.init_worker()
    if not local_conf.discovery.eureka or
        not local_conf.discovery.eureka.host or #local_conf.discovery.eureka.host == 0 then
        error("do not set eureka.host")
        return
    end

    local ok, err = core.schema.check(schema, local_conf.discovery.eureka)
    if not ok then
        error("invalid eureka configuration: " .. err)
        return
    end
    default_weight = local_conf.discovery.eureka.weight or 100
    log.info("default_weight:", default_weight, ".")
    local fetch_interval = local_conf.discovery.eureka.fetch_interval or 30
    log.info("fetch_interval:", fetch_interval, ".")
    ngx_timer_at(0, fetch_full_registry)
    ngx_timer_every(fetch_interval, fetch_full_registry)
Copy the code

Lua, ngx_timer_AT (0, fetch_full_registry) immediately pulls the available service nodes from Eureka, then starts a timer, Ngx_timer_every (fetch_interval, fetch_full_registry) : periodically pulls node data according to the locally configured fetch_interval (30 seconds by default).

Plug-in initialization

Next comes the initialization of the plug-in:

  local_conf = core.config.local_conf(true)
        http_plugin_names = local_conf.plugins
        stream_plugin_names = local_conf.stream_plugins
Copy the code
local function load(plugin_names)
    local processed = {}
    for _, name in ipairs(plugin_names) do
        if processed[name] == nil then
            processed[name] = true
        end
    end

    core.log.warn("new plugins: ", core.json.delay_encode(processed))

    for name in pairs(local_plugins_hash) do
        unload_plugin(name)
    end

    core.table.clear(local_plugins)
    core.table.clear(local_plugins_hash)

    for name in pairs(processed) do
        load_plugin(name, local_plugins)
    end

   ...
end
Copy the code

In the initialization of the plug-in, first get the local plug-in configuration, iterate to remove the old plug-in, and then load in sequence:

local function load_plugin(name, plugins_list, is_stream_plugin)
    local pkg_name = "apisix.plugins." .. name
    if is_stream_plugin then
        pkg_name = "apisix.stream.plugins." .. name
    end

    local ok, plugin = pcall(require, pkg_name)
...
    plugin.name = name
    plugin.attr = plugin_attr(name)
    core.table.insert(plugins_list, plugin)

    if plugin.init then
        plugin.init()
    end

    return
end
Copy the code

“Apisix.plugins.” concatenates the relative path with name and references it, then calls plugin_attr(name) for the injection of locally configured plug-in attributes and puts them into local_plugins defined by plugin.lua, and finally calls the respective initialization functions of the plug-in.

 -- sort by plugin's priority
    if #local_plugins > 1 then
        sort_tab(local_plugins, sort_plugin)
    end

    for i, plugin in ipairs(local_plugins) do
        local_plugins_hash[plugin.name] = plugin
        if local_conf and local_conf.apisix
           and local_conf.apisix.enable_debug then
            core.log.warn("loaded plugin and sort by priority:",
                          " ", plugin.priority,
                          " name: ", plugin.name)
        end
    end

    _M.load_times = _M.load_times + 1
    core.log.info("load plugin times: ", _M.load_times)
    return true
Copy the code

After all local plug-ins are loaded, the list of plug-ins is prioritized and placed in the local_plugins_hash table with the plug-in name key and value.

Route initialization
function _M.http_init_worker()
    local conf = core.config.local_conf()
    local router_http_name = "radixtree_uri"
    local router_ssl_name = "radixtree_sni"

    if conf and conf.apisix and conf.apisix.router then
        router_http_name = conf.apisix.router.http or router_http_name
        router_ssl_name = conf.apisix.router.ssl or router_ssl_name
    end

    local router_http = require("apisix.http.router." .. router_http_name)
    attach_http_router_common_methods(router_http)
    router_http.init_worker(filter)
    _M.router_http = router_http

    local router_ssl = require("apisix.ssl.router." .. router_ssl_name)
    router_ssl.init_worker()
    _M.router_ssl = router_ssl
Copy the code

Apisix provides three routing matching rules: radixtree_host_URI, radixtree_URI, and radixtree_uri_with_parameter. By default, radixtree_URI (compressed prefix tree) is used. Attach_http_router_common_methods (router_http) binds two default methods to radixtree_uri.lua (if there is no custom implementation) :

Init_worker () : Call the new function in apisix.core.config_etcd.lua (default) to pull the route information from the /route directory in etcd and assign it to user_routes

Routes () : Returns all routes pulled by init_worker() above (route data and version)

After the above method binding is complete, init_worker() is called to pull the route, _M.router_http = router_http saves the initialized radixtree_URI in router.lua. Router.router_http. match(API_ctx)

Data update

The new function in apisix.core.config_etcd.lua is mentioned above, which retrieves data from the etCD database in the specified directory:

function _M.new(key, opts) ... local automatic = opts and opts.automatic local item_schema = opts and opts.item_schema local filter_fun = opts and opts.filter local timeout = opts and opts.timeout local single_item = opts and opts.single_item local checker = opts and  opts.checker local obj = setmetatable({ etcd_cli = nil, key = key and prefix .. key, automatic = automatic, item_schema = item_schema, checker = checker, sync_times = 0, running = true, conf_version = 0, values = nil, need_reload = true, routes_hash = nil, prev_index = 0, last_err = nil, last_err_time = nil, resync_delay = resync_delay, timeout = timeout, single_item = single_item, filter = filter_fun, }, mt) ...Copy the code

Obj is the object that is eventually returned. , which predefines a set of fields and their initial values, and sets the metatable mt for it in setmetatable() :

local mt = {
    __index = _M,
    __tostring = function(self)
        return " etcd key: " .. self.key
    end
}
Copy the code
Local _M = {version = 0.3, local_conf = config_local.local_conf, clear_local_cache = config_local.clear_cache,}Copy the code

Here’s a sidebar: mt defines an __index key that means: When you access atable by key, if the key has no value, Lua looks for the _index key in the metatable of the table. If the _index key contains atable, Lua looks for the corresponding key in the table.

Moving on to the new function:

if automatic then
        if not key then
            return nil, "missing `key` argument"
        end

        if loaded_configuration[key] then
            local res = loaded_configuration[key]
            loaded_configuration[key] = nil -- tried to load

            log.notice("use loaded configuration ", key)

            local dir_res, headers = res.body, res.headers
            load_full_data(obj, dir_res, headers)
        end

        ngx_timer_at(0, _automatic_fetch, obj)

    else
        local etcd_cli, err = get_etcd()
        if not etcd_cli then
            return nil, "failed to start a etcd instance: " .. err
        end
        obj.etcd_cli = etcd_cli
    end

    if key then
        created_obj[key] = obj
    end

    return obj
Copy the code

The loaded_configuration file is loaded with the key’s data. If so, the loaded_configuration file is loaded with the key’s data. The loaded_configuration file is loaded with the key’s data. Load_full_data directly assigns to obj and sets need_reload to flase (skip the next readdir request).

ngx_timer_at(0, _automatic_fetch, obj)

Ngx_timer_at creates an Nginx timer, and in the event loop, Nginx finds the expired timer and executes the corresponding Lua callback in a separate coroutine, _automatic_FETCH — automatic update logic.

Next, let’s look at the automatic update logic, which is also the main logic of Apisix and ETCD to implement hot update, hot plug-in:

local function _automatic_fetch(premature, self)
    if premature then
        return
    end

    local i = 0
    while not exiting() and self.running and i <= 32 do
        i = i + 1

        local ok, err = xpcall(function()
            if not self.etcd_cli then
                local etcd_cli, err = get_etcd()
                if not etcd_cli then
                    error("failed to create etcd instance for key ["
                          .. self.key .. "]: " .. (err or "unknown"))
                end
                self.etcd_cli = etcd_cli
            end

            local ok, err = sync_data(self)
            
          ...
    end

    if not exiting() and self.running then
        ngx_timer_at(0, _automatic_fetch, self)
    end
end
Copy the code

No longer withdraw () and self. Running and I <= 32 do there’s a 32 no other than the beginning and end… I don’t know what it means

Where, local etcd_cli, err = get_etcd() obtains the etCD client and assigns it to obj, local OK, err = sync_data(self) performs data update:

local function sync_data(self) if not self.key then return nil, "missing 'key' arguments" end if self.need_reload then local res, err = readdir(self.etcd_cli, self.key) ... Self. Values then for I, val in ipairs(self. Values) do if val and val.clean_handlers then for _, clean_handler in ipairs(val.clean_handlers) do clean_handler(val) end val.clean_handlers = nil end end self.values = nil Self.values_hash = nil end // Assign new data load_full_data(self, dir_res, headers) return true end err = waitdir(self.etcd_cli, self.key, self.prev_index + 1, self.timeout) log.info("waitdir key: ", self.key, " prev_index: ", self.prev_index + 1) log.info("res: ", json.delay_encode(dir_res, true)) ... local res_copy = res -- waitdir will return [res] even for self.single_item = true for _, res in ipairs(res_copy) do ... self:upgrade_version(res.modifiedIndex) ... local pre_index = self.values_hash[key] if pre_index then local pre_val = self.values[pre_index] if pre_val and pre_val.clean_handlers then for _, clean_handler in ipairs(pre_val.clean_handlers) do clean_handler(pre_val) end pre_val.clean_handlers = nil end if res.value then if not self.single_item then res.value.id = key end self.values[pre_index] = res res.clean_handlers = {} log.info("update data by key: ", key) else self.sync_times = self.sync_times + 1 self.values[pre_index] = false self.values_hash[key] = nil log.info("delete data by key: ", key) end elseif res.value then res.clean_handlers = {} insert_tab(self.values, res) self.values_hash[key] = #self.values if not self.single_item then res.value.id = key end log.info("insert data by key: ", key) end ... if self.filter then self.filter(res) end self.conf_version = self.conf_version + 1 end return self.values endCopy the code

For need_reload, the default /apisix directory was loaded once when the main process was initialized, so it was flase. The rest of the directories need to be loaded for the first time at this point. Local res, err = readdir(self.etcd_cli, self.key); Local dir_res, err = waitdir(self.etcd_cli, self.key, self.prev_index + 1, self.timeout) Local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts) which calls the watchdir function in iresty-lua-resty-etcd, That is, etCD’s Watch mechanism is used to listen for changes in the specified key.

insert_tab(self.values, res)
self.values_hash[key] = #self.values
self.conf_version = self.conf_version + 1
Copy the code

When updated data is obtained, obj is reassigned and the version is updated.

New returns a composite object containing etCD client data, directory data (values), filters, etc., which determines whether to listen for etCD directories for automatic updates according to Automatic.

The request processing

Then we analyze the flow of the request when it comes in:

 access_by_lua_block {
    apisix.http_access_phase()
}
Copy the code
Ngx_ctx pretreatment
function _M.http_access_phase()
    local ngx_ctx = ngx.ctx
​
    if ngx_ctx.api_ctx and ngx_ctx.api_ctx.ssl_client_verified then
        local res = ngx_var.ssl_client_verify
        if res ~= "SUCCESS" then
            if res == "NONE" then
                core.log.error("client certificate was not present")
            else
                core.log.error("clent certificate verification is not passed: ", res)
            end
            return core.response.exit(400)
        end
    end
​
    -- always fetch table from the table pool, we don't need a reused api_ctx
    local api_ctx = core.tablepool.fetch("api_ctx", 0, 32)
    ngx_ctx.api_ctx = api_ctx
    core.ctx.set_vars_meta(api_ctx)
...
Copy the code

Ngx.ctx is the request context, which has the same life cycle as a single request and is used to share data at different stages of the request. Core.ctx.set_vars_meta (API_ctx) inside sets the meta table for var. Api_ctx. var will be called when external variables need to be obtained by request. According to the index function mentioned above, if the corresponding key cannot be found in the VAR table, the index function in the meta table will be searched. The index function defines a number of ways to get the corresponding nginx* variable using FFI from LuaJit, which has better performance than ngx.var.*. On the other hand, it also does caching:

            local cached = t._cache[key]
            if cached ~= nil then
                return cached
            end
            
            ....   
            
            if val ~= nil then
                t._cache[key] = val
            end
Copy the code
Routing matching
. if router.api.has_route_not_under_apisix() or core.string.has_prefix(uri, "/apisix/") then local skip = local_conf and local_conf.apisix.global_rule_skip_internal_api local matched = router.api.match(api_ctx, skip) if matched then return end end router.router_http.match(api_ctx) -- run global rule plugin.run_global_rules(api_ctx, router.global_rules, nil) ...Copy the code

Has_route_not_under_apisix () defaults to true. If you look at the reference, you can see that the fetch_API_router () function assigns has_route_not_under_apisix: That is, the list of local plug-ins is traversed, true if the plug-in provides an API, such as Prometheus’ export_uri, which is not prefixed with /apisix/;

Core.string. has_prefix(uri, “/apisix/”) determines whether the current request uri is prefixed with /apisix/

If either of the two conditions is met, the matching logic of router.api.match is used. If a match is made, return is returned. According to the preceding information, services should avoid prefix with /apisix when defining routing rules. Generally, apis exposed by Apisix or plug-ins are prefix with /apisix.

Then let’s look at the real (business-level) routing matching rules:

Router_http: radixtree_uri Router_http: radixtree_uri Router_http: radixtree_uri

function _M.match(api_ctx)
    local user_routes = _M.user_routes
    if not cached_version or cached_version ~= user_routes.conf_version then
        uri_router = base_router.create_radixtree_uri_router(user_routes.values,
                                                             uri_routes, false)
        cached_version = user_routes.conf_version
    end
​
    if not uri_router then
        core.log.error("failed to fetch valid `uri` router: ")
        return true
    end
​
    return base_router.match_uri(uri_router, match_opts, api_ctx)
end
​
Copy the code

Cached_version = nil; user_routes = new; cached_version = nil; Uri_router is created from the latest routing directory obtained from etCD and the cached version is updated. Finally, the match_uri function of apisix.http.route is passed as an input parameter to base_router.match_uri:

function _M.match_uri(uri_router, match_opts, api_ctx)
    core.table.clear(match_opts)
    match_opts.method = api_ctx.var.request_method
    match_opts.host = api_ctx.var.host
    match_opts.remote_addr = api_ctx.var.remote_addr
    match_opts.vars = api_ctx.var
    match_opts.matched = core.tablepool.fetch("matched_route_record", 0, 4)
​
    local ok = uri_router:dispatch(api_ctx.var.uri, match_opts, api_ctx, match_opts)
    return ok
end
Copy the code

Dispatch ultimately performs the route matching.

Plugin.run_global_rules (API_ctx, router.global_rules, nil) is the matching logic for global routes:

        local plugins = core.tablepool.fetch("plugins", 32, 0)
        local values = global_rules.values
        for _, global_rule in config_util.iterate_values(values) do
            api_ctx.conf_type = "global_rule"
            api_ctx.conf_version = global_rule.modifiedIndex
            api_ctx.conf_id = global_rule.value.id
​
            core.table.clear(plugins)
            plugins = _M.filter(global_rule, plugins)
            if phase_name == nil then
                _M.run_plugin("rewrite", plugins, api_ctx)
                _M.run_plugin("access", plugins, api_ctx)
            else
                _M.run_plugin(phase_name, plugins, api_ctx)
            end
        end
Copy the code

In this step, rules of global routes are iterated. Plugins configured in the _M.filter(global_rule, plugins) step are returned

_M.run_plugin performs plug-in execution: calls the functions of the corresponding phase phase of the plug-in based on the phase passed in

The plugin filters

Next we look at plugin filtering:

local plugins = plugin.filter(route)
api_ctx.plugins = plugins
plugin.run_plugin("rewrite", plugins, api_ctx)
Copy the code
function _M.filter(user_route, plugins)
    local user_plugin_conf = user_route.value.plugins
    if user_plugin_conf == nil or
       core.table.nkeys(user_plugin_conf) == 0 then
        trace_plugins_info_for_debug(nil)
        -- when 'plugins' is given, always return 'plugins' itself instead
        -- of another one
        return plugins or core.empty_tab
    end
​
    plugins = plugins or core.tablepool.fetch("plugins", 32, 0)
    for _, plugin_obj in ipairs(local_plugins) do
        local name = plugin_obj.name
        local plugin_conf = user_plugin_conf[name]
​
        if type(plugin_conf) == "table" and not plugin_conf.disable then
            core.table.insert(plugins, plugin_obj)
            core.table.insert(plugins, plugin_conf)
        end
    end
​
    trace_plugins_info_for_debug(plugins)
​
    return plugins
end
​
Copy the code

According to the analysis of plug-in initialization above, local_plugins are lists of locally defined plug-ins that are initialized and prioritized during worker process initialization. User_plugin_conf is the latest list of plugins enabled on etCD (after merging plugin_config.merge with merge_service_route). The intersection of the two results in a runnable plug-in that can be run by calling plugin.run_plugin.

Load balancing

Let’s look at load balancing:

    if up_id then
        local upstream = get_upstream_by_id(up_id)   
        api_ctx.matched_upstream = upstream    
   
...   
   local code, err = set_upstream(route, api_ctx)
    if code then
        core.log.error("failed to set upstream: ", err)
        core.response.exit(code)
    end
​
    local server, err = load_balancer.pick_server(route, api_ctx)
    if not server then
        core.log.error("failed to pick server: ", err)
        return core.response.exit(502)
    end
​
    api_ctx.picked_server = server
​
    set_upstream_headers(api_ctx, server)
Copy the code

There are two ways to obtain the upstream service node. One is to write the IP address and port information of each upstream node in the route.

The other is to pull available nodes from the service registry. The latter is more recommended for obtaining upstream nodes:

When the service volume changes, you need to expand the capacity of the upstream service or replace the server due to a server hardware fault. If the gateway is configured to maintain upstream service information, the maintenance cost is conceivable under the microservices architecture. Moreover, the failure to update the information in time will also have a certain impact on the business, and the impact of human misoperation can not be ignored, so it is necessary for the gateway to dynamically obtain the latest service instance information through the service registry.

We’ve already talked about service discovery initialization. Official implementations like Eureka or NACOS use a timer to pull ngx_timer_every(fetch_interval, fetch_full_registry) of available nodes from the registry at regular intervals. What’s officially known as “quasi-real time.”

Pick_server (route, api_ctx) At this point, unhealthy nodes (if health check is configured) are filtered out internally.