MQTT (Message Queuing Telemetry Transport) is an OASIS standard messaging protocol used in IoT. Publish/subscribe is ideal for connecting to remote messaging devices because it has very small network bandwidth. MQTT is currently used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas.

From mqtt.org/

For MQTT brokers, the current mainstream implementations include EMQ, Mosquito, HiveMQ, etc., but there is no complete Go language implementation. The current open source Go implementation basically lacks the support of MQTT protocol, while Gmqtt fully implements MQTT V3.1.1 and the latest V5 protocol, which should be the most complete project to support MQTT protocol in Go language.

Project address: github.com/DrmagicE/gm…

Gmqtt was born out of a previous project that required a lot of business logic to be customized inside an MQTT broker. After investigating some brokers, they were not satisfied, so they rolled up their sleeves and built the wheel themselves. At first, only V3.1.1 was supported, but in line with the principle of perfection (I have obsessive-compulsive disorder), I gave up the time of lifting iron, liver for a period of time, and all V5 features are supported.

Quick start

As with all Go projects, Go Get is available for download.

$ go get -u github.com/DrmagicE/gmqtt $ cd cmd/gmqttd $ go run . start -c default_config.yml 2020-12-13T23:11:54.037+0800 INFO server/server.go:996 init plugin hook wrappers 2020-12-13T23:11:54.037+0800 INFO server/server.go:802 open persistence succeeded {"type": "Memory "} 2020-12-13T23:11:54.037+0800 INFO Server /server.go:825 init session store Succeeded {"type": "memory", "session_total": 0} 2020-12-13T23:11:54.037+0800 INFO server/server.go:842 init queue store Succeeded {"type": Succeeded "memory", "session_total": 0} 2020-12-13T23:11:54.037+0800 INFO Server /server.go:843 init subscription store Succeeded {"type": "Memory ", "client_total": 0} 2020-12-13T23:11:54.037+0800 INFO server/server.go:1218 loading plugin {"name": "Prometheus "} 2020-12-13t23:11:54.037 +0800 INFO server/server.go:1218 Loading plugin {"name": } 2020-12-13T23:11:54.038+0800 INFO server/server.go:1259 starting GMQTT server {" TCP server listen on": ["[::]:1883"], "websocket server listen on": [":8883"]}Copy the code

Using the above command will start GMQTT with the default configuration default_config.yml, listening on port 1883 for TCP service and port 8883 for Websocket service. Gmqtt authentication is not enabled by default. Clients can connect to the Gmqtt without authentication.

The characteristics of

Gmqtt is so extensible that you can customize almost any logic with custom plugins. For example, the HTTP/gRPC interface is used to query client information, force disconnections, subscribe to topics, publish messages, and so on. This extensibility benefits from the rich hook functions provided by GMQTT, as well as its built-in extension interface.

Hook function

Currently, GMQTT provides 17 hook functions.

hook instructions Use the sample
OnAccept Called when a TCP connection is established TCP connection rate limiting, blacklist and whitelist, etc.
OnStop Called when the Broker exits
OnSubscribe Called when a subscription request is received Verify the subscription is valid
OnSubscribed Called after the subscription succeeds Statistics the number of subscription packets
OnUnsubscribe Called when unsubscribing Verify whether unsubscribe is allowed
OnUnsubscribed Called after successfully unsubscribing Statistics the number of subscription packets
OnMsgArrived Called when a message advertisement packet is received Verify publish permissions and rewrite publish messages
OnBasicAuth Called when a connection request packet is received Client connection authentication
OnEnhancedAuth Called when a connection request message with AuthMetho is received (V5 feature) Client connection authentication
OnReAuth Called when receiving an Auth message (V5 feature) Client connection authentication
OnConnected Called after the client connection is successful The number of online clients is collected
OnSessionCreated Called after the client creates a new session Counting the Number of Sessions
OnSessionResumed Called when the client recovers from the old session Counting the Number of Sessions
OnSessionTerminated Called after the session is deleted Counting the Number of Sessions
OnDeliver Called after a message is delivered from the broker to the client
OnClosed Called after the client disconnects The number of online clients is collected
OnMsgDropped Called when a message is discarded

Github.com/DrmagicE/gm…

For common OnBasicAuth, OnSubscribe, OnMsgArrived as an example, explain how to use these functions to customized authentication logic. We store the following six client usernames and passwords in memory.

Var validUser = map[string]string{"root": "PWD ", "qos0":" PWD ", "qos0": "PWD ", "qos0":" PWD ", // Qos1 users are only allowed to subscribe to qos1 theme "publishonly": "PWD ", // Publishonly users are only allowed to publish, and are not allowed to subscribe to" subscribeOnly ": Disable_shared: "PWD ", // Disable_shared user disables subscribing to a topic that represents a shared subscription (V5 feature)}Copy the code

In addition to the user permissions set above, let’s assume that for performance reasons only QoS1 messages are allowed to be published and all QoS2 messages are ignored.

Login authentication

//authentication
var onBasicAuth server.OnBasicAuth = func(ctx context.Context, client server.Client, req *server.ConnectRequest) error {
	username := string(req.Connect.Username)
	password := string(req.Connect.Password)
	// Verify user name and password
	if validateUser(username, password) {
		if username == "disable_shared" {
			// Disable shared subscriptions
			req.Options.SharedSubAvailable = false
		}
		return nil
	}
	// Check the client version, compatible with V311 and V5 different error code returned
	switch client.Version() {
	case packets.Version5:
		return codes.NewError(codes.BadUserNameOrPassword)
	case packets.Version311:
		return codes.NewError(codes.V3BadUsernameorPassword)
	}
	// Check returns nil
	return nil
}
Copy the code

As you can see, in the OnBasicAuth hook function, we can get the necessary information for authentication, such as username and password. Besides these two information, there are many other information, such as clientID and IP address, which can be used as authentication parameters. If authentication is judged to have failed, the error code defined by MQTT is returned. If the authentication succeeds, return nil.

Subscription permission Control

// subscription acl var onSubscribe server.OnSubscribe = func(ctx context.Context, client server.Client, Req *server.SubscribeRequest) error {// Get the user name. In almost all hook functions, Username := client.clientOptions ().Username // All subscription messages from k,v := range req.subscriptions { Switch username {case "root": // If the user is root, he can subscribe to whatever he wants. Case "qos0": Req.grantqos (k, packets. qos0) case "qos1": If v.sub.qos > packets. qos1 {req.grantqos (k, packets. qos1)} case "publishonly": // Reject all subscriptions req.reject (k, &codes.Error{Code: codes.notauthorized, ErrorDetails: codes.ErrorDetails{ ReasonString: []byte("publish only"), }, }) } } return nil }Copy the code

Publishing permission Control

var onMsgArrived server.OnMsgArrived = func(ctx context.Context, client server.Client, req *server.MsgArrivedRequest) error { version := client.Version() if client.ClientOptions().Username == "subscribeonly" {switch version {case packets.Version311: // For V311, if the server does not allow the client to release a certain message, the server can only reply to a normal ACK because there is no notification mechanism. // Disconnect the client. [mqTT-33.5-2]. // We discard this packet. // client.close () return nil case packets.Version5: // For V5, an error code reply mechanism is introduced, so we can reply with an error code to tell the client that there is no permission. return &codes.Error{ Code: Codes.notauthorized,} // Or you can still be tough and close the client, However, since V5 supports the server to send Disconnect packets to the client // it is a better choice for V5 to use client.disconnect () instead of client.close () // req.drop () //client.Disconnect(&packets.Disconnect{ // Version: packets.Version5, // Code: Codes.unspecifiederror, //) //return}} if req.message. QoS == packets. Drop() return & codes.error {Code: codes.notauthorized, ErrorDetails: codes.errorDetails {ReasonString: []byte("not authorized"), UserProperties: []struct { K []byte V []byte }{ { K: []byte("user property key"), V: []byte("user property value"), }, }, }, } } return nil }Copy the code

The full code above can be found here: github.com/DrmagicE/gm…

Extension interface

Github.com/DrmagicE/gm…

Type Server interface {// Publisher enables the user to send MQTT messages to the broker Publisher() Publisher // GetConfig returns the current configuration file GetConfig() config. config // StatsManager Returns the StatsManager() StatsReader . // ClientService Provides operations such as querying clients, forcing them to go offline, and clearing sessions forcibly. ClientService() ClientService // SubscriptionService allows you to add, delete, change, or check a subscription. SubscriptionService() SubscriptionService // RetainedService() RetainedService() RetainedService }Copy the code

Gmqtt provides the above interfaces to provide extension capabilities, which are typically invoked by plug-ins. As you can see, through these extended interfaces, we can use function calls to send messages to brokers, add, delete, change, and query client connections, and so on.

Plug-in mechanism

Based on hook functions and extension interfaces, developers can write plug-ins to flexibly extend GMQTT’s capabilities. GMQTT currently comes with three built-in plug-ins: Auth authentication, Prometheus monitoring, and adminAPI management. Plugin-related interface definition: plugin.go

// HookWrapper groups all hook wrappers function
type HookWrapper struct{ OnBasicAuthWrapper OnBasicAuthWrapper OnEnhancedAuthWrapper OnEnhancedAuthWrapper OnConnectedWrapper OnConnectedWrapper OnReAuthWrapper OnReAuthWrapper OnSessionCreatedWrapper OnSessionCreatedWrapper OnSessionResumedWrapper OnSessionResumedWrapper OnSessionTerminatedWrapper OnSessionTerminatedWrapper OnSubscribeWrapper  OnSubscribeWrapper OnSubscribedWrapper OnSubscribedWrapper OnUnsubscribeWrapper OnUnsubscribeWrapper OnUnsubscribedWrapper OnUnsubscribedWrapper OnMsgArrivedWrapper OnMsgArrivedWrapper OnMsgDroppedWrapper OnMsgDroppedWrapper OnDeliverWrapper OnDeliverWrapper OnCloseWrapper OnCloseWrapper OnAcceptWrapper OnAcceptWrapper OnStopWrapper OnStopWrapper }// NewPlugin is the constructor of the plug-in
type NewPlugin func(config config.Config) (Plugable, error)

// Plugin is the interface that all plug-ins need to implement
type Plugin interface {
	// Load is called during server startup, and you can see how we pass the extension interface to the plug-in through this method. Enable plug-ins to call extension interfaces.
	Load(service Server) error
	// Unload is called when the server exits, so the plug-in can do some cleanup.
	Unload() error
	// HookWrapper returns the hook functions that the plug-in needs to register with the broker, or an empty structure if the plug-in does not need to register any hook functions.
	HookWrapper() HookWrapper
	// Name Returns the Name of the plug-in.
	Name() string
}
Copy the code

For details on how to implement a plug-in, see the built-in plug-ins Admin and Prometheus before detailed plug-in documentation is available.

Session persistence is supported

Gmqtt uses memory storage by default, which is recommended by Gmqtt. Memory storage has excellent performance advantages, but the disadvantage is that session information can be lost after the broker restarts.

If you want the session not to be lost after the restart, you can configure redis persistent storage:

persistence:
  type: redis # memory or redis
  redis:
    # redis address
    addr: "127.0.0.1:6379"
    The maximum number of free connections in the connection pool
    max_idle: 1000
    # Maximum number of active connections in the pool. 0 indicates no limit.
    max_active: 0
    Timeout will close the idle connection
    idle_timeout: 240s
    password: ""
    database: 0
Copy the code

defects

  • Cluster mode is not yet supported. (Next step)

If you are interested in this project, please feel free to support start.