In the last article, we introduced the basic features of Gmqtt and the basic use of hook functions. In this article, we will take a detailed look at the Gmqtt plug-in mechanism and how to write a plug-in.

A review of the Gmqtt — Go language implemented MQTT Broker project address: github.com/DrmagicE/gm…

The lifetime of the session

According to the MQTT protocol specification, each client connection has a session corresponding to it, and the client can specify whether this session needs to be persisted or not. For a persistent session, even if the client is offline, the broker holds the subscription information for it and the messages matching its subscription. When the client is online again, the broker delivers these messages to the client. This ensures that messages will not be lost even when the network is frequently interrupted. Gmqtt provides hook functions to be injected into the session lifecycle, so that the plug-in can manage changes in the session lifecycle.

  • OnBasicAuth received –CONNECTUsed for basic authentication.
  • OnEnhancedAuth — Enhanced authentication (support for V5 protocol)
  • OnConnected — The client is successfully connected
  • OnSessionCreated – The client succeeded in creating a session
  • OnSessionResumed — The client recovers from the session
  • OnSessionTerminated – terminates the session
  • OnClosed — The client is disconnected

Let’s illustrate the position of these hook functions throughout the session lifecycle using basic authentication as an example.

The session to establish

The session delete

A nonpersistent session is deleted when the connection is closed, while a timeout period is set for a persistent session. When the session times out, it is deleted.

For V3.1.1, the client uses the cleanSession field of the CONNECT packet to determine whether a session needs to be persisted. In THE V5 protocol, the cleanSession field is renamed to the cleanStart field, and the client controls the desired session retention time by setting a timeout.

sessionThe deletion process is shown in the figure below:

Topic subscribe/unsubscribe process

The topic subscription and cancellation process involves the following hook functions:

  • OnSubscribe – one of the commonly used hook functions when receivedSUBSCRIBEAfter the packet is triggered, it can be used for permission control and topic rewriting.
  • OnSubscribed – Triggered when a topic is successfully subscribed.
  • OnUnsubscribe – when receivedUNSUBSCRIBEPacket is triggered. Can be used for permission control, theme rewriting and other functions.
  • OnUnsubscribed – Triggered when a topic is unsubscribed.

Subscribe to the process

Unsubscribe process

Message Publishing process

The message publishing process involves the following hook functions:

  • OnMsgArrived – one of the commonly used hook functions when receivedPUBLISHAfter the packet is triggered, it can be used for permission control and message rewriting.
  • OnDelivered – Triggered when a message is delivered to the client. (This post success is from the broker’s point of view and is not guaranteed to be received by the client.)
  • OnMsgDropped – Triggered when a message is dropped. (Possible causes include queue full or message timeout)

wrappermodel

Wrapper mode, also known as wrapper or decorator mode, is a very popular design pattern in the Go language and is often used to implement various middleware middleware. For example, here is a simple HTTP middleware for printing logs:

Type HTTPWrapper func(h http.handlerfunc) http.handlerfunc // LogMiddleware prints logs before and after requests. func LogMiddleware(h http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, Req * http.request) {log.println (" start processing Request "); Req) log.println (" request completed ")}} func main() {var HDL http.handlerfunc = func(w http.responsewriter, req *http.Request) { w.WriteHeader(200) } http.HandleFunc("/", LogMiddleware(hdl)) http.ListenAndServe(":8080", nil) }Copy the code

Sample output from the above program:

2020/12/20 15:17:03 Processing request 2020/12/20 15:17:03 Processing request 2020/12/20 15:17:03 Processing request completeCopy the code

In Gmqtt, all hook functions have corresponding Wrapper functions. A plug-in needs to declare the wrapper function that the plug-in needs to use. Take the authentication plug-in as an example:

Github.com/DrmagicE/gm…

// HookWrapper returns the wrapper function that Auth needs to care about
func (a *Auth) HookWrapper(a) server.HookWrapper {
	return server.HookWrapper{
		// The Auth authentication plugin only cares about OnBasicAuthWrapper
		OnBasicAuthWrapper: a.OnBasicAuthWrapper,
	}
}
func (a *Auth) OnBasicAuthWrapper(pre server.OnBasicAuth) server.OnBasicAuth {
	return func(ctx context.Context, client server.Client, req *server.ConnectRequest) (err error) {
		// Handle the OnBasicAuth logic of the previous plug-in
		err = pre(ctx, client, req)
		iferr ! =nil {
			return err
		}
		/ /... Handles the authentication logic for this plug-in}}Copy the code

A hook function can be used by multiple plug-ins. The plug-in uses wrapper mode to wrap the hook function and finally inject a wrapped hook function into the Gmqtt mount point. For example, in the code example above, the Auth plug-in “wraps” the hook function of the previous plug-in (pre server.onbasicAuth). The Auth plug-in chooses to execute the hook function of the previous plug-in first. If the previous plug-in returns failure, it returns failure directly, skipping the authentication logic of this plug-in. The Auth plug-in can also choose to execute the authentication logic of this plug-in first:

func (a *Auth) OnBasicAuthWrapper(pre server.OnBasicAuth) server.OnBasicAuth { return func(ctx context.Context, client server.Client, req *server.ConnectRequest) (err error) { // ... Process the authentication logic of the plug-in. // If the authentication fails, the connection is rejected. // If the verification succeeds, the connection is allowed based on the authentication result of the previous plug-in.Copy the code

Note that in this case Auth controls the previous plugin’s corresponding hook function and is free to inject any logic before and after the previous plugin’s OnBasicAuth execution. That is, although a hook function can be used by more than one plug-in at a time, it has a priority depending on the order in which the plug-in is loaded.

The loading order of plug-ins

The loading order of plug-ins is controlled by plugin_order in the configuration file:

# plugin loading orders
plugin_order:
  - auth
  - prometheus
  - admin
Copy the code

Plugin_order holds the name of the plug-in, and the array order represents the order in which the plug-in was loaded. In Gmqtt, the plug-in loaded first has more control. For example, we have three plugins A, B, and C, all using OnBasicAuth hook functions, and their loading order is A->B->C. In the wrapper function OnBasicAuthWrapper(pre server.onBasicAuth) server.OnBasicAuth, Pre wraps the OnBasicAuth implementation of both B and C. The wrapper of plugin B contains the OnBasicAuth implementation of plugin C, while the wrapper of plugin C only contains A default OnBasicAuth implementation specified by Gmqtt. The relationship between the layers of A,B, and C for OnBasicAuth is shown in the following figure:

With the Wrapper pattern, developers can perform a series of controls by combining multiple plug-ins.

How to write plug-ins

As long as the SERVER.Plugin interface of Gmqtt is implemented, it is a Gmqtt plug-in. In order to simplify plug-in development, Gmqtt provides plug-in template generation tool, which can quickly generate plug-in templates through the command line, so that developers can focus on business implementation.

usegmqctlCommand line tool

Install command line tools:

$ go install github.com/DrmagicE/gmqtt/cmd/gmqctl
Copy the code

GMQCTL gen plugin –help GMQCTL gen plugin –help

$ gmqctl gen plugin --help
code generator

Usage:
  gmqctl gen plugin [flags]

Examples:
The following command will generate a code template for the 'awesome' plugin, which makes use of OnBasicAuth and OnSubscribe hook and enables the configuration in ./plugins directory.

gmqctl gen plugin -n awesome -H OnBasicAuth,OnSubscribe -c true -o ./plugins

Flags:
  -c, --config          Whether the plugin needs a configuration.
  -h, --help            help for plugin
  -H, --hooks string    The hooks use by the plugin, multiple hooks are separated by ','
  -n, --name string     The plugin name.
  -o, --output string   The output directory.
Copy the code

We run under the project root of Gmqtt:

gmqctl gen plugin -n awesome -H OnBasicAuth,OnSubscribe -c true
Copy the code

The above command generates the following files in the plugin directory:

├─ ├─ ├─ $tree./plugin/awesome./plugin/ Awesome ├─ │ ├─ config.go #Copy the code

Let’s go file by file, starting with awesome. Go:

package awesome import ( "go.uber.org/zap" "github.com/DrmagicE/gmqtt/config" "github.com/DrmagicE/gmqtt/server" ) var _ Server.plugin = (*Awesome)(nil) const Name = "Awesome" func init() {// Register the constructor of this Plugin. Server.registerplugin (Name, New) // Since we specified -c true, this means that the plugin needs configuration items. // Register the default configuration items here. When the configuration file is configured with the default, enable the default configuration. config.RegisterDefaultPluginConfig(Name, Func New(config config.config) (server.plugin, Error) {panic("implement me")} var log * zap.logger // implement Plugin interface structure. Type Awesome struct {} // Load Is executed by Gmqtt in the import sequence of plug-ins. // Load's main function is to pass the server. server interface to the plug-in. func (a *Awesome) Load(service server.Server) error { log = server.LoggerWithField(zap.String("plugin", Name)) panic(" Implement Me ")} // Unload is called when the broker exits and can do some cleaning. func (a *Awesome) Unload() error { panic("implement me") } func (a *Awesome) Name() string { return Name }Copy the code

And then the config. Go:

Package awesome // Config is the configuration for the awesome plugin. type Config struct { } // Validate validates the configuration, Func (c *Config) Validate() error {// Validate is used to check whether the configuration is valid. // Gmqtt executes the Validate method for each plug-in during the import configuration phase. // If the validation fails, an error is reported and startup stops. Panic ("implement me")} // DefaultConfig is the default configuration. var DefaultConfig = Config{ Use the default Settings. } func(c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { Each plug-in has to customize its own YAML parsing logic. // For details, please refer to other built-in plug-ins. panic("implement me") }Copy the code

Finally, the hook. The go:

package awesome import ( "github.com/DrmagicE/gmqtt/server" ) func (a *Awesome) HookWrapper() server.HookWrapper { return server.HookWrapper{ OnBasicAuthWrapper: a.OnBasicAuthWrapper, OnSubscribeWrapper: // GMQCTL automatically generates the template, so you can fill in the business logic. func (a *Awesome) OnBasicAuthWrapper(pre server.OnBasicAuth) server.OnBasicAuth { panic("impermanent me") } func (a *Awesome) OnSubscribeWrapper(pre server.OnSubscribe) server.OnSubscribe { panic("impermanent me") }Copy the code

Import the plug-in and recompile

New plug-ins need to be recompiled to use. Gmqtt unified plugin import file: CMD/GMQTTD /plugins.go:

Package the main import (here / / import all plug-ins (in order to invoke the corresponding init method) _ _ "github.com/DrmagicE/gmqtt/plugin/admin" "github.com/DrmagicE/gmqtt/plugin/auth" _ "github.com/DrmagicE/gmqtt/plugin/prometheus" _ "path/to/your/plugin" )Copy the code

Changing the Boot Sequence

As mentioned above, the startup order of plugins is controlled by the configuration file, and only plugins added in plugin_order are loaded:

# plugin loading orders
plugin_order:
  - auth
  - prometheus
  - admin
  - your_plugin
Copy the code

Plug-in Configuration Modification

If the plug-in declares a usage configuration, Gmqtt loads the configuration for it from the configuration file where the plug-in’s configuration is stored as plugin. Plugin name space:

plugins:
  prometheus:
    path: "/metrics"
    listen_address: ":8082"
  admin:
    http:
      enable: true
      addr: :8083
    grpc:
      addr: 8084
  auth:
    # Password hash type. (plain | md5 | sha256 | bcrypt)
    # Default to MD5.
    hash: md5
    # The file to store password. Default to $HOME/gmqtt_password.yml
    # password_file:
Copy the code

At this point, a new plug-in is developed. You can refer to the built-in plug-ins and the project’s Example directory for more examples.

If you are interested in this project, welcome to start support, message exchange.