Start the
checkRuning
local pid_path = env.apisix_home .. "/logs/nginx.pid" local pid = util.read_file(pid_path) pid = tonumber(pid) if pid then local lsof_cmd = "lsof -p " .. pid local res, err = util.execute_cmd(lsof_cmd) if not (res and res == "") then if not res then print(err) else print("APISIX is running..." ) end return end print("nginx.pid exists but there's no corresponding process with pid ", pid, ", the file will be overwritten") endCopy the code
If Nginx is started, run the lsof command to check whether the Nginx process number exists. If the process number exists, run the lsof command to check whether the Nginx process number exists
init_etcd(env, args)
util.execute_cmd(env.openresty_args)
Copy the code
Nginx configuration file generation, ETCD initialization, and OpenResty startup command execution are followed:
Initialize Apisix
local function init(env)
if env.is_root_path then
print('Warning! Running apisix under /root is only suitable for '
.. 'development environments and it is dangerous to do so. '
.. 'It is recommended to run APISIX in a directory '
.. 'other than /root.')
end
-- read_yaml_conf
local yaml_conf, err = file.read_yaml_conf(env.apisix_home)
if not yaml_conf then
util.die("failed to read local yaml config of apisix: ", err, "\n")
end
...
end
Copy the code
First check whether the Apisix installation directory is the root path, then call read_YAMl_conf (env.apisix_HOME), pass the installation path for the configuration file address splicing, read the configuration file and hand it to TinyyAML parsing (parsing to table type).
local use_openresty_1_17 = false
if not version_greater_equal(or_ver, "1.19.3") then
use_openresty_1_17 = true
end
local or_info = util.execute_cmd("openresty -V 2>&1")
local with_module_status = true
if or_info and not or_info:find("http_stub_status_module", 1, true) then
stderr:write("'http_stub_status_module' module is missing in ",
"your openresty, please check it out. Without this ",
"module, there will be fewer monitoring indicators.\n")
with_module_status = false
end
local use_apisix_openresty = true
if or_info and not or_info:find("apisix-nginx-module", 1, true) then
use_apisix_openresty = false
end
local enabled_plugins = {}
for i, name in ipairs(yaml_conf.plugins) do
enabled_plugins[name] = true
end
....
local sys_conf = {
use_openresty_1_17 = use_openresty_1_17,
lua_path = env.pkg_path_org,
lua_cpath = env.pkg_cpath_org,
os_name = util.trim(util.execute_cmd("uname")),
apisix_lua_home = env.apisix_home,
with_module_status = with_module_status,
use_apisix_openresty = use_apisix_openresty,
error_log = {level = "warn"},
enabled_plugins = enabled_plugins,
dubbo_upstream_multiplex_count = dubbo_upstream_multiplex_count,
tcp_enable_ssl = tcp_enable_ssl,
}
....
Copy the code
Then prepare the system context, that is, a series of parameter verification and transformation, obtain the information related to the system runtime environment, and assign value to sys_conf, verification.
local conf_render = template.compile(ngx_tpl)
local ngxconf = conf_render(sys_conf)
local ok, err = util.write_file(env.apisix_home .. "/conf/nginx.conf",
Copy the code
Finally, according to ngx_tPL (Nginx template) and sys_conf, lua-resty-template is used to render the template. The sys_conf parameter values are passed into ngxconf to generate the nginx.conf file.
Initialize the etcd
function _M.init(env, args) ... local host_count = #(yaml_conf.etcd.host) local scheme for i = 1, host_count do local host = yaml_conf.etcd.host[i] local fields = util.split(host, "://") if not fields then util.die("malformed etcd endpoint: ", host, "\n") end if not scheme then scheme = fields[1] elseif scheme ~= fields[1] then print([[WARNING: mixed protocols among etcd endpoints]]) end end -- check the etcd cluster version for index, host in ipairs(yaml_conf.etcd.host) do local version_url = host .. "/version" local errmsg local res, err local retry_time = 0 while retry_time < 2 do res, err = request(version_url, yaml_conf) -- In case of failure, request returns nil followed by an error message. -- Else the first return value is the response body -- and followed by the response status code. if res then break end retry_time = retry_time + 1 print(str_format("Warning! Request etcd endpoint \'%s\' error, %s, retry time=%s", version_url, err, retry_time)) end if not res then errmsg = str_format("request etcd endpoint \'%s\' error, %s\n", version_url, err) util.die(errmsg) end local body, _, err = dkjson.decode(res) if err or (body and not body["etcdcluster"]) then errmsg = str_format("got malformed version message: \"%s\" from etcd \"%s\"\n", res, version_url) util.die(errmsg) end local cluster_version = body["etcdcluster"] if compare_semantic_version(cluster_version, env.min_etcd_version) then util.die("etcd cluster version ", cluster_version, " is less than the required version ", env.min_etcd_version, ", please upgrade your etcd cluster\n") end endCopy the code
Etcd starts by reading the configuration file and verifying it, resolving the address set of the ETCD cluster, obtaining the version of the ETCD cluster in two retries, and verifying the version by compare_semantic_version.
for index, host in ipairs(yaml_conf.etcd.host) do local is_success = true local errmsg local auth_token local user = yaml_conf.etcd.user local password = yaml_conf.etcd.password if user and password then local auth_url = host .. "/v3/auth/authenticate" local json_auth = { name = etcd_conf.user, password = etcd_conf.password } local post_json_auth = dkjson.encode(json_auth) local response_body = {} local res, err local retry_time = 0 while retry_time < 2 do res, err = request({ url = auth_url, method = "POST", source = ltn12.source.string(post_json_auth), sink = ltn12.sink.table(response_body), headers = { ["Content-Length"] = #post_json_auth } }, yaml_conf) -- In case of failure, request returns nil followed by an error message. -- Else the first return value is just the number 1 -- and followed by the response status code. if res then break end retry_time = retry_time + 1 print(str_format("Warning! Request etcd endpoint \'%s\' error, %s, retry time=%s", auth_url, err, retry_time)) end if not res then errmsg = str_format("request etcd endpoint \"%s\" error, %s\n", auth_url, err) util.die(errmsg) end local res_auth = table_concat(response_body) local body_auth, _, err_auth = dkjson.decode(res_auth) if err_auth or (body_auth and not body_auth["token"]) then errmsg = str_format("got malformed auth message: \"%s\" from etcd \"%s\"\n", res_auth, auth_url) util.die(errmsg) end auth_token = body_auth.token endCopy the code
Then call /v3/ Auth/Authenticate to obtain the ETCD token. Subsequent operations require Authorization in the request header for identity verification.
local dirs = {}
for name in pairs(constants.HTTP_ETCD_DIRECTORY) do
dirs[name] = true
end
for name in pairs(constants.STREAM_ETCD_DIRECTORY) do
dirs[name] = true
end
for dir_name in pairs(dirs) do
local key = (etcd_conf.prefix or "") .. dir_name .. "/"
local put_url = host .. "/v3/kv/put"
local post_json = '{"value":"' .. base64_encode("init_dir")
.. '", "key":"' .. base64_encode(key) .. '"}'
local response_body = {}
local headers = {["Content-Length"] = #post_json}
if auth_token then
headers["Authorization"] = auth_token
end
local res, err
local retry_time = 0
while retry_time < 2 do
res, err = request({
url = put_url,
method = "POST",
source = ltn12.source.string(post_json),
sink = ltn12.sink.table(response_body),
headers = headers
}, yaml_conf)
retry_time = retry_time + 1
if res then
break
end
print(str_format("Warning! Request etcd endpoint \'%s\' error, %s, retry time=%s",
put_url, err, retry_time))
end
if not res then
errmsg = str_format("request etcd endpoint \"%s\" error, %s\n", put_url, err)
util.die(errmsg)
end
local res_put = table_concat(response_body)
if res_put:find("404 page not found", 1, true) then
errmsg = str_format("gRPC gateway is not enabled in etcd cluster \"%s\",",
"which is required by Apache APISIX\n")
util.die(errmsg)
end
if res_put:find("error", 1, true) then
is_success = false
if (index == host_count) then
errmsg = str_format("got malformed key-put message: \"%s\" from etcd \"%s\"\n",
res_put, put_url)
util.die(errmsg)
end
break
end
if args and args["verbose"] then
print(res_put)
end
end
if is_success then
etcd_ok = true
break
end
end
if not etcd_ok then
util.die("none of the configured etcd works well")
end
Copy the code
Local post_json = ‘{“value”:”‘.. base64_encode(“init_dir”) .. ‘”, “key”:”‘ .. base64_encode(key) .. After ‘”}’ is created successfully, the directory structure generated in etcd is:
@11d2d7374c12:/$ etcdctl get --prefix "" /apisix/consumers/ init_dir /apisix/data_plane/server_info/039a6657-8d21-4b3b-b364-32c1ad92a6a6 {" id ":" 039 a6657-8 b3b d21-4-32 c1ad92a6a6 b364 - ", "etcd_version" : "3.4.0", "version" : "2.5", "hostname" : "4364 cdfb7e75", "up_time" : 13941,"boot_time":1625041981,"last_report_time":1625055922} /apisix/global_rules/ init_dir /apisix/plugin_configs/ init_dir /apisix/plugin_metadata/ init_dir /apisix/plugins [{"name":"api-breaker"},{"name":"authz-keycloak"},{"name":"basic-auth"},{"name":"batch-requests"},{"name":"consumer-rest riction"},{"name":"cors"},{"name":"echo"},{"name":"fault-injection"},{"name":"grpc-transcode"},{"name":"hmac-auth"},{"na me":"http-logger"},{"name":"ip-restriction"},{"name":"jwt-auth"},{"name":"kafka-logger"},{"name":"key-auth"},{"name":"li mit-conn"},{"name":"limit-count"},{"name":"limit-req"},{"name":"openid-connect"},{"name":"prometheus"},{"name":"proxy-ca che"},{"name":"proxy-mirror"},{"name":"proxy-rewrite"},{"name":"redirect"},{"name":"referer-restriction"},{"name":"reque st-id"},{"name":"request-validation"},{"name":"response-rewrite"},{"name":"serverless-post-function"},{"name":"serverles s-pre-function"},{"name":"sls-logger"},{"name":"syslog"},{"name":"tcp-logger"},{"name":"udp-logger"},{"name":"uri-blocke r"},{"name":"wolf-rbac"},{"name":"zipkin"},{"name":"server-info"},{"name":"traffic-split"},{"name":"mqtt-proxy","stream" :true}] /apisix/plugins/ init_dir /apisix/proto/ init_dir /apisix/routes/ init_dir /apisix/services/ init_dir /apisix/ssl/ init_dir /apisix/stream_routes/ init_dir /apisix/upstreams/ init_dirCopy the code
Start the Openresty
Apisix is an application built on OpenResty. The last step is to start OpenResty util.execute_cmd(env.openresty_args). local openresty_args = [[openresty -p ]] .. apisix_home .. [[ -c ]] .. apisix_home .. [[/conf/nginx.conf]]
The main process is initialized
By looking at the previously generated Nginx configuration file:
Init_by_lua_block {require "resty.core" apisix = require("apisix") local dns_resolver = {"127.0.0.11", } local args = { dns_resolver = dns_resolver, } apisix.http_init(args) }Copy the code
You can see that the main process initializes when apisix.http_init is called:
function _M.http_init(args)
require("resty.core")
if require("ffi").os == "Linux" then
require("ngx.re").opt("jit_stack_size", 200 * 1024)
end
require("jit.opt").start("minstitch=2", "maxtrace=4000",
"maxrecord=8000", "sizemcode=64",
"maxmcode=4000", "maxirconst=1000")
core.resolver.init_resolver(args)
core.id.init()
local process = require("ngx.process")
local ok, err = process.enable_privileged_agent()
if not ok then
core.log.error("failed to enable privileged_agent: ", err)
end
if core.config.init then
local ok, err = core.config.init()
if not ok then
core.log.error("failed to load the configuration: ", err)
end
end
end
Copy the code
This method is followed by jit parameter setting and startup, APisix UID generation, Agent process startup, and core.config.init().
Core.config.init()
Let’s first look at the definition of config in core.lua:
local config_center = local_conf.apisix and local_conf.apisix.config_center
or "etcd"
log.info("use config_center: ", config_center)
local config = require("apisix.core.config_" .. config_center)
config.type = config_center
Copy the code
By default, config_center configured in the local configuration is used. If it is not configured, etcd is used, i.e. Config = config_etcd.lua
Now look at its init operation:
local local_conf, err = config_local.local_conf()
if not local_conf then
return nil, err
end
if table.try_read_attr(local_conf, "apisix", "disable_sync_configuration_during_start") then
return true
end
local etcd_cli, err = get_etcd()
if not etcd_cli then
return nil, "failed to start a etcd instance: " .. err
end
local etcd_conf = local_conf.etcd
local prefix = etcd_conf.prefix
local res, err = readdir(etcd_cli, prefix, create_formatter(prefix))
if not res then
return nil, err
end
return true
end
Copy the code
*local res, err = readdir(etcd_cli, prefix, create_formatter(prefix))*
Create_formatter (prefix) generates the formatter based on the local ETCD configured prefix (default: “/apisix”).
Readdir reads directory contents from etCD and formats them with formatter.
local function readdir(etcd_cli, key, formatter)
if not etcd_cli then
return nil, "not inited"
end
local res, err = etcd_cli:readdir(key)
if not res then
-- log.error("failed to get key from etcd: ", err)
return nil, err
end
if type(res.body) ~= "table" then
return nil, "failed to read etcd dir"
end
res, err = etcd_apisix.get_format(res, key .. '/', true, formatter)
if not res then
return nil, err
end
return res
end
Copy the code
The readdir function of the etcd client is called first, and the contents of the specified key (in this case, the key is /apisix, that is, the root directory) are pulled from the etCD, and then the etcd_apisix.get_format(res, key.. ‘/’, true, formatter) to further manipulate the data according to the formatter passed in:
function _M.get_format(res, real_key, is_dir, formatter)
...
if formatter then
return formatter(res)
end
...
end
Copy the code
Formatter iterates data from etCD into config_etCD defined loadeD_configuration for subsequent use.
The worker process is initialized
Copy the code
Apisix.http_init_worker () is responsible for the initialization of the worker process by calling the init_work() function of each component. Let’s focus on the following steps:
function _M.http_init_worker()
...
local discovery = require("apisix.discovery.init").discovery
if discovery and discovery.init_worker then
discovery.init_worker()
end
...
plugin.init_worker()
router.http_init_worker()
require("apisix.http.service").init_worker()
plugin_config.init_worker()
require("apisix.consumer").init_worker()
if core.config == require("apisix.core.config_yaml") then
core.config.init_worker()
end
...
end
Copy the code
Service discovery initialization
The first is initialization of service discovery (if configured) :
if discovery_type then
for discovery_name, _ in pairs(discovery_type) do
log.info("use discovery: ", discovery_name)
discovery[discovery_name] = require("apisix.discovery." .. discovery_name)
end
end
function discovery.init_worker()
if discovery_type then
for discovery_name, _ in pairs(discovery_type) do
discovery[discovery_name].init_worker()
end
end
end
Copy the code
In discovery.init.lua, all the service discovery components configured locally call their init_worker() in turn to initialize them. Take Eureka for example:
function _M.init_worker()
if not local_conf.discovery.eureka or
not local_conf.discovery.eureka.host or #local_conf.discovery.eureka.host == 0 then
error("do not set eureka.host")
return
end
local ok, err = core.schema.check(schema, local_conf.discovery.eureka)
if not ok then
error("invalid eureka configuration: " .. err)
return
end
default_weight = local_conf.discovery.eureka.weight or 100
log.info("default_weight:", default_weight, ".")
local fetch_interval = local_conf.discovery.eureka.fetch_interval or 30
log.info("fetch_interval:", fetch_interval, ".")
ngx_timer_at(0, fetch_full_registry)
ngx_timer_every(fetch_interval, fetch_full_registry)
Copy the code
Lua, ngx_timer_AT (0, fetch_full_registry) immediately pulls the available service nodes from Eureka, then starts a timer, Ngx_timer_every (fetch_interval, fetch_full_registry) : periodically pulls node data according to the locally configured fetch_interval (30 seconds by default).
Plug-in initialization
Next comes the initialization of the plug-in:
local_conf = core.config.local_conf(true)
http_plugin_names = local_conf.plugins
stream_plugin_names = local_conf.stream_plugins
Copy the code
local function load(plugin_names)
local processed = {}
for _, name in ipairs(plugin_names) do
if processed[name] == nil then
processed[name] = true
end
end
core.log.warn("new plugins: ", core.json.delay_encode(processed))
for name in pairs(local_plugins_hash) do
unload_plugin(name)
end
core.table.clear(local_plugins)
core.table.clear(local_plugins_hash)
for name in pairs(processed) do
load_plugin(name, local_plugins)
end
...
end
Copy the code
In the initialization of the plug-in, first get the local plug-in configuration, iterate to remove the old plug-in, and then load in sequence:
local function load_plugin(name, plugins_list, is_stream_plugin)
local pkg_name = "apisix.plugins." .. name
if is_stream_plugin then
pkg_name = "apisix.stream.plugins." .. name
end
local ok, plugin = pcall(require, pkg_name)
...
plugin.name = name
plugin.attr = plugin_attr(name)
core.table.insert(plugins_list, plugin)
if plugin.init then
plugin.init()
end
return
end
Copy the code
“Apisix.plugins.” concatenates the relative path with name and references it, then calls plugin_attr(name) for the injection of locally configured plug-in attributes and puts them into local_plugins defined by plugin.lua, and finally calls the respective initialization functions of the plug-in.
-- sort by plugin's priority
if #local_plugins > 1 then
sort_tab(local_plugins, sort_plugin)
end
for i, plugin in ipairs(local_plugins) do
local_plugins_hash[plugin.name] = plugin
if local_conf and local_conf.apisix
and local_conf.apisix.enable_debug then
core.log.warn("loaded plugin and sort by priority:",
" ", plugin.priority,
" name: ", plugin.name)
end
end
_M.load_times = _M.load_times + 1
core.log.info("load plugin times: ", _M.load_times)
return true
Copy the code
After all local plug-ins are loaded, the list of plug-ins is prioritized and placed in the local_plugins_hash table with the plug-in name key and value.
Route initialization
function _M.http_init_worker()
local conf = core.config.local_conf()
local router_http_name = "radixtree_uri"
local router_ssl_name = "radixtree_sni"
if conf and conf.apisix and conf.apisix.router then
router_http_name = conf.apisix.router.http or router_http_name
router_ssl_name = conf.apisix.router.ssl or router_ssl_name
end
local router_http = require("apisix.http.router." .. router_http_name)
attach_http_router_common_methods(router_http)
router_http.init_worker(filter)
_M.router_http = router_http
local router_ssl = require("apisix.ssl.router." .. router_ssl_name)
router_ssl.init_worker()
_M.router_ssl = router_ssl
Copy the code
Apisix provides three routing matching rules: radixtree_host_URI, radixtree_URI, and radixtree_uri_with_parameter. By default, radixtree_URI (compressed prefix tree) is used. Attach_http_router_common_methods (router_http) binds two default methods to radixtree_uri.lua (if there is no custom implementation) :
Init_worker () : Call the new function in apisix.core.config_etcd.lua (default) to pull the route information from the /route directory in etcd and assign it to user_routes
Routes () : Returns all routes pulled by init_worker() above (route data and version)
After the above method binding is complete, init_worker() is called to pull the route, _M.router_http = router_http saves the initialized radixtree_URI in router.lua. Router.router_http. match(API_ctx)
Data update
The new function in apisix.core.config_etcd.lua is mentioned above, which retrieves data from the etCD database in the specified directory:
function _M.new(key, opts) ... local automatic = opts and opts.automatic local item_schema = opts and opts.item_schema local filter_fun = opts and opts.filter local timeout = opts and opts.timeout local single_item = opts and opts.single_item local checker = opts and opts.checker local obj = setmetatable({ etcd_cli = nil, key = key and prefix .. key, automatic = automatic, item_schema = item_schema, checker = checker, sync_times = 0, running = true, conf_version = 0, values = nil, need_reload = true, routes_hash = nil, prev_index = 0, last_err = nil, last_err_time = nil, resync_delay = resync_delay, timeout = timeout, single_item = single_item, filter = filter_fun, }, mt) ...Copy the code
Obj is the object that is eventually returned. , which predefines a set of fields and their initial values, and sets the metatable mt for it in setmetatable() :
local mt = {
__index = _M,
__tostring = function(self)
return " etcd key: " .. self.key
end
}
Copy the code
Local _M = {version = 0.3, local_conf = config_local.local_conf, clear_local_cache = config_local.clear_cache,}Copy the code
Here’s a sidebar: mt defines an __index key that means: When you access atable by key, if the key has no value, Lua looks for the _index key in the metatable of the table. If the _index key contains atable, Lua looks for the corresponding key in the table.
Moving on to the new function:
if automatic then
if not key then
return nil, "missing `key` argument"
end
if loaded_configuration[key] then
local res = loaded_configuration[key]
loaded_configuration[key] = nil -- tried to load
log.notice("use loaded configuration ", key)
local dir_res, headers = res.body, res.headers
load_full_data(obj, dir_res, headers)
end
ngx_timer_at(0, _automatic_fetch, obj)
else
local etcd_cli, err = get_etcd()
if not etcd_cli then
return nil, "failed to start a etcd instance: " .. err
end
obj.etcd_cli = etcd_cli
end
if key then
created_obj[key] = obj
end
return obj
Copy the code
The loaded_configuration file is loaded with the key’s data. If so, the loaded_configuration file is loaded with the key’s data. The loaded_configuration file is loaded with the key’s data. Load_full_data directly assigns to obj and sets need_reload to flase (skip the next readdir request).
ngx_timer_at(0, _automatic_fetch, obj)
Ngx_timer_at creates an Nginx timer, and in the event loop, Nginx finds the expired timer and executes the corresponding Lua callback in a separate coroutine, _automatic_FETCH — automatic update logic.
Next, let’s look at the automatic update logic, which is also the main logic of Apisix and ETCD to implement hot update, hot plug-in:
local function _automatic_fetch(premature, self)
if premature then
return
end
local i = 0
while not exiting() and self.running and i <= 32 do
i = i + 1
local ok, err = xpcall(function()
if not self.etcd_cli then
local etcd_cli, err = get_etcd()
if not etcd_cli then
error("failed to create etcd instance for key ["
.. self.key .. "]: " .. (err or "unknown"))
end
self.etcd_cli = etcd_cli
end
local ok, err = sync_data(self)
...
end
if not exiting() and self.running then
ngx_timer_at(0, _automatic_fetch, self)
end
end
Copy the code
No longer withdraw () and self. Running and I <= 32 do there’s a 32 no other than the beginning and end… I don’t know what it means
Where, local etcd_cli, err = get_etcd() obtains the etCD client and assigns it to obj, local OK, err = sync_data(self) performs data update:
local function sync_data(self) if not self.key then return nil, "missing 'key' arguments" end if self.need_reload then local res, err = readdir(self.etcd_cli, self.key) ... Self. Values then for I, val in ipairs(self. Values) do if val and val.clean_handlers then for _, clean_handler in ipairs(val.clean_handlers) do clean_handler(val) end val.clean_handlers = nil end end self.values = nil Self.values_hash = nil end // Assign new data load_full_data(self, dir_res, headers) return true end err = waitdir(self.etcd_cli, self.key, self.prev_index + 1, self.timeout) log.info("waitdir key: ", self.key, " prev_index: ", self.prev_index + 1) log.info("res: ", json.delay_encode(dir_res, true)) ... local res_copy = res -- waitdir will return [res] even for self.single_item = true for _, res in ipairs(res_copy) do ... self:upgrade_version(res.modifiedIndex) ... local pre_index = self.values_hash[key] if pre_index then local pre_val = self.values[pre_index] if pre_val and pre_val.clean_handlers then for _, clean_handler in ipairs(pre_val.clean_handlers) do clean_handler(pre_val) end pre_val.clean_handlers = nil end if res.value then if not self.single_item then res.value.id = key end self.values[pre_index] = res res.clean_handlers = {} log.info("update data by key: ", key) else self.sync_times = self.sync_times + 1 self.values[pre_index] = false self.values_hash[key] = nil log.info("delete data by key: ", key) end elseif res.value then res.clean_handlers = {} insert_tab(self.values, res) self.values_hash[key] = #self.values if not self.single_item then res.value.id = key end log.info("insert data by key: ", key) end ... if self.filter then self.filter(res) end self.conf_version = self.conf_version + 1 end return self.values endCopy the code
For need_reload, the default /apisix directory was loaded once when the main process was initialized, so it was flase. The rest of the directories need to be loaded for the first time at this point. Local res, err = readdir(self.etcd_cli, self.key); Local dir_res, err = waitdir(self.etcd_cli, self.key, self.prev_index + 1, self.timeout) Local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts) which calls the watchdir function in iresty-lua-resty-etcd, That is, etCD’s Watch mechanism is used to listen for changes in the specified key.
insert_tab(self.values, res)
self.values_hash[key] = #self.values
self.conf_version = self.conf_version + 1
Copy the code
When updated data is obtained, obj is reassigned and the version is updated.
New returns a composite object containing etCD client data, directory data (values), filters, etc., which determines whether to listen for etCD directories for automatic updates according to Automatic.
The request processing
Then we analyze the flow of the request when it comes in:
access_by_lua_block {
apisix.http_access_phase()
}
Copy the code
Ngx_ctx pretreatment
function _M.http_access_phase()
local ngx_ctx = ngx.ctx
if ngx_ctx.api_ctx and ngx_ctx.api_ctx.ssl_client_verified then
local res = ngx_var.ssl_client_verify
if res ~= "SUCCESS" then
if res == "NONE" then
core.log.error("client certificate was not present")
else
core.log.error("clent certificate verification is not passed: ", res)
end
return core.response.exit(400)
end
end
-- always fetch table from the table pool, we don't need a reused api_ctx
local api_ctx = core.tablepool.fetch("api_ctx", 0, 32)
ngx_ctx.api_ctx = api_ctx
core.ctx.set_vars_meta(api_ctx)
...
Copy the code
Ngx.ctx is the request context, which has the same life cycle as a single request and is used to share data at different stages of the request. Core.ctx.set_vars_meta (API_ctx) inside sets the meta table for var. Api_ctx. var will be called when external variables need to be obtained by request. According to the index function mentioned above, if the corresponding key cannot be found in the VAR table, the index function in the meta table will be searched. The index function defines a number of ways to get the corresponding nginx* variable using FFI from LuaJit, which has better performance than ngx.var.*. On the other hand, it also does caching:
local cached = t._cache[key]
if cached ~= nil then
return cached
end
....
if val ~= nil then
t._cache[key] = val
end
Copy the code
Routing matching
. if router.api.has_route_not_under_apisix() or core.string.has_prefix(uri, "/apisix/") then local skip = local_conf and local_conf.apisix.global_rule_skip_internal_api local matched = router.api.match(api_ctx, skip) if matched then return end end router.router_http.match(api_ctx) -- run global rule plugin.run_global_rules(api_ctx, router.global_rules, nil) ...Copy the code
Has_route_not_under_apisix () defaults to true. If you look at the reference, you can see that the fetch_API_router () function assigns has_route_not_under_apisix: That is, the list of local plug-ins is traversed, true if the plug-in provides an API, such as Prometheus’ export_uri, which is not prefixed with /apisix/;
Core.string. has_prefix(uri, “/apisix/”) determines whether the current request uri is prefixed with /apisix/
If either of the two conditions is met, the matching logic of router.api.match is used. If a match is made, return is returned. According to the preceding information, services should avoid prefix with /apisix when defining routing rules. Generally, apis exposed by Apisix or plug-ins are prefix with /apisix.
Then let’s look at the real (business-level) routing matching rules:
Router_http: radixtree_uri Router_http: radixtree_uri Router_http: radixtree_uri
function _M.match(api_ctx)
local user_routes = _M.user_routes
if not cached_version or cached_version ~= user_routes.conf_version then
uri_router = base_router.create_radixtree_uri_router(user_routes.values,
uri_routes, false)
cached_version = user_routes.conf_version
end
if not uri_router then
core.log.error("failed to fetch valid `uri` router: ")
return true
end
return base_router.match_uri(uri_router, match_opts, api_ctx)
end
Copy the code
Cached_version = nil; user_routes = new; cached_version = nil; Uri_router is created from the latest routing directory obtained from etCD and the cached version is updated. Finally, the match_uri function of apisix.http.route is passed as an input parameter to base_router.match_uri:
function _M.match_uri(uri_router, match_opts, api_ctx)
core.table.clear(match_opts)
match_opts.method = api_ctx.var.request_method
match_opts.host = api_ctx.var.host
match_opts.remote_addr = api_ctx.var.remote_addr
match_opts.vars = api_ctx.var
match_opts.matched = core.tablepool.fetch("matched_route_record", 0, 4)
local ok = uri_router:dispatch(api_ctx.var.uri, match_opts, api_ctx, match_opts)
return ok
end
Copy the code
Dispatch ultimately performs the route matching.
Plugin.run_global_rules (API_ctx, router.global_rules, nil) is the matching logic for global routes:
local plugins = core.tablepool.fetch("plugins", 32, 0)
local values = global_rules.values
for _, global_rule in config_util.iterate_values(values) do
api_ctx.conf_type = "global_rule"
api_ctx.conf_version = global_rule.modifiedIndex
api_ctx.conf_id = global_rule.value.id
core.table.clear(plugins)
plugins = _M.filter(global_rule, plugins)
if phase_name == nil then
_M.run_plugin("rewrite", plugins, api_ctx)
_M.run_plugin("access", plugins, api_ctx)
else
_M.run_plugin(phase_name, plugins, api_ctx)
end
end
Copy the code
In this step, rules of global routes are iterated. Plugins configured in the _M.filter(global_rule, plugins) step are returned
_M.run_plugin performs plug-in execution: calls the functions of the corresponding phase phase of the plug-in based on the phase passed in
The plugin filters
Next we look at plugin filtering:
local plugins = plugin.filter(route)
api_ctx.plugins = plugins
plugin.run_plugin("rewrite", plugins, api_ctx)
Copy the code
function _M.filter(user_route, plugins)
local user_plugin_conf = user_route.value.plugins
if user_plugin_conf == nil or
core.table.nkeys(user_plugin_conf) == 0 then
trace_plugins_info_for_debug(nil)
-- when 'plugins' is given, always return 'plugins' itself instead
-- of another one
return plugins or core.empty_tab
end
plugins = plugins or core.tablepool.fetch("plugins", 32, 0)
for _, plugin_obj in ipairs(local_plugins) do
local name = plugin_obj.name
local plugin_conf = user_plugin_conf[name]
if type(plugin_conf) == "table" and not plugin_conf.disable then
core.table.insert(plugins, plugin_obj)
core.table.insert(plugins, plugin_conf)
end
end
trace_plugins_info_for_debug(plugins)
return plugins
end
Copy the code
According to the analysis of plug-in initialization above, local_plugins are lists of locally defined plug-ins that are initialized and prioritized during worker process initialization. User_plugin_conf is the latest list of plugins enabled on etCD (after merging plugin_config.merge with merge_service_route). The intersection of the two results in a runnable plug-in that can be run by calling plugin.run_plugin.
Load balancing
Let’s look at load balancing:
if up_id then
local upstream = get_upstream_by_id(up_id)
api_ctx.matched_upstream = upstream
...
local code, err = set_upstream(route, api_ctx)
if code then
core.log.error("failed to set upstream: ", err)
core.response.exit(code)
end
local server, err = load_balancer.pick_server(route, api_ctx)
if not server then
core.log.error("failed to pick server: ", err)
return core.response.exit(502)
end
api_ctx.picked_server = server
set_upstream_headers(api_ctx, server)
Copy the code
There are two ways to obtain the upstream service node. One is to write the IP address and port information of each upstream node in the route.
The other is to pull available nodes from the service registry. The latter is more recommended for obtaining upstream nodes:
When the service volume changes, you need to expand the capacity of the upstream service or replace the server due to a server hardware fault. If the gateway is configured to maintain upstream service information, the maintenance cost is conceivable under the microservices architecture. Moreover, the failure to update the information in time will also have a certain impact on the business, and the impact of human misoperation can not be ignored, so it is necessary for the gateway to dynamically obtain the latest service instance information through the service registry.
We’ve already talked about service discovery initialization. Official implementations like Eureka or NACOS use a timer to pull ngx_timer_every(fetch_interval, fetch_full_registry) of available nodes from the registry at regular intervals. What’s officially known as “quasi-real time.”
Pick_server (route, api_ctx) At this point, unhealthy nodes (if health check is configured) are filtered out internally.