MQTT (Message Queuing Telemetry Transport) is an OASIS standard messaging protocol used in IoT. Publish/subscribe is ideal for connecting to remote messaging devices because it has very small network bandwidth. MQTT is currently used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas.
From mqtt.org/
For MQTT brokers, the current mainstream implementations include EMQ, Mosquito, HiveMQ, etc., but there is no complete Go language implementation. The current open source Go implementation basically lacks the support of MQTT protocol, while Gmqtt fully implements MQTT V3.1.1 and the latest V5 protocol, which should be the most complete project to support MQTT protocol in Go language.
Project address: github.com/DrmagicE/gm…
Gmqtt was born out of a previous project that required a lot of business logic to be customized inside an MQTT broker. After investigating some brokers, they were not satisfied, so they rolled up their sleeves and built the wheel themselves. At first, only V3.1.1 was supported, but in line with the principle of perfection (I have obsessive-compulsive disorder), I gave up the time of lifting iron, liver for a period of time, and all V5 features are supported.
Quick start
As with all Go projects, Go Get is available for download.
$ go get -u github.com/DrmagicE/gmqtt $ cd cmd/gmqttd $ go run . start -c default_config.yml 2020-12-13T23:11:54.037+0800 INFO server/server.go:996 init plugin hook wrappers 2020-12-13T23:11:54.037+0800 INFO server/server.go:802 open persistence succeeded {"type": "Memory "} 2020-12-13T23:11:54.037+0800 INFO Server /server.go:825 init session store Succeeded {"type": "memory", "session_total": 0} 2020-12-13T23:11:54.037+0800 INFO server/server.go:842 init queue store Succeeded {"type": Succeeded "memory", "session_total": 0} 2020-12-13T23:11:54.037+0800 INFO Server /server.go:843 init subscription store Succeeded {"type": "Memory ", "client_total": 0} 2020-12-13T23:11:54.037+0800 INFO server/server.go:1218 loading plugin {"name": "Prometheus "} 2020-12-13t23:11:54.037 +0800 INFO server/server.go:1218 Loading plugin {"name": } 2020-12-13T23:11:54.038+0800 INFO server/server.go:1259 starting GMQTT server {" TCP server listen on": ["[::]:1883"], "websocket server listen on": [":8883"]}Copy the code
Using the above command will start GMQTT with the default configuration default_config.yml, listening on port 1883 for TCP service and port 8883 for Websocket service. Gmqtt authentication is not enabled by default. Clients can connect to the Gmqtt without authentication.
The characteristics of
Gmqtt is so extensible that you can customize almost any logic with custom plugins. For example, the HTTP/gRPC interface is used to query client information, force disconnections, subscribe to topics, publish messages, and so on. This extensibility benefits from the rich hook functions provided by GMQTT, as well as its built-in extension interface.
Hook function
Currently, GMQTT provides 17 hook functions.
hook | instructions | Use the sample |
---|---|---|
OnAccept | Called when a TCP connection is established | TCP connection rate limiting, blacklist and whitelist, etc. |
OnStop | Called when the Broker exits | |
OnSubscribe | Called when a subscription request is received | Verify the subscription is valid |
OnSubscribed | Called after the subscription succeeds | Statistics the number of subscription packets |
OnUnsubscribe | Called when unsubscribing | Verify whether unsubscribe is allowed |
OnUnsubscribed | Called after successfully unsubscribing | Statistics the number of subscription packets |
OnMsgArrived | Called when a message advertisement packet is received | Verify publish permissions and rewrite publish messages |
OnBasicAuth | Called when a connection request packet is received | Client connection authentication |
OnEnhancedAuth | Called when a connection request message with AuthMetho is received (V5 feature) | Client connection authentication |
OnReAuth | Called when receiving an Auth message (V5 feature) | Client connection authentication |
OnConnected | Called after the client connection is successful | The number of online clients is collected |
OnSessionCreated | Called after the client creates a new session | Counting the Number of Sessions |
OnSessionResumed | Called when the client recovers from the old session | Counting the Number of Sessions |
OnSessionTerminated | Called after the session is deleted | Counting the Number of Sessions |
OnDeliver | Called after a message is delivered from the broker to the client | |
OnClosed | Called after the client disconnects | The number of online clients is collected |
OnMsgDropped | Called when a message is discarded |
Github.com/DrmagicE/gm…
For common OnBasicAuth, OnSubscribe, OnMsgArrived as an example, explain how to use these functions to customized authentication logic. We store the following six client usernames and passwords in memory.
Var validUser = map[string]string{"root": "PWD ", "qos0":" PWD ", "qos0": "PWD ", "qos0":" PWD ", // Qos1 users are only allowed to subscribe to qos1 theme "publishonly": "PWD ", // Publishonly users are only allowed to publish, and are not allowed to subscribe to" subscribeOnly ": Disable_shared: "PWD ", // Disable_shared user disables subscribing to a topic that represents a shared subscription (V5 feature)}Copy the code
In addition to the user permissions set above, let’s assume that for performance reasons only QoS1 messages are allowed to be published and all QoS2 messages are ignored.
Login authentication
//authentication
var onBasicAuth server.OnBasicAuth = func(ctx context.Context, client server.Client, req *server.ConnectRequest) error {
username := string(req.Connect.Username)
password := string(req.Connect.Password)
// Verify user name and password
if validateUser(username, password) {
if username == "disable_shared" {
// Disable shared subscriptions
req.Options.SharedSubAvailable = false
}
return nil
}
// Check the client version, compatible with V311 and V5 different error code returned
switch client.Version() {
case packets.Version5:
return codes.NewError(codes.BadUserNameOrPassword)
case packets.Version311:
return codes.NewError(codes.V3BadUsernameorPassword)
}
// Check returns nil
return nil
}
Copy the code
As you can see, in the OnBasicAuth hook function, we can get the necessary information for authentication, such as username and password. Besides these two information, there are many other information, such as clientID and IP address, which can be used as authentication parameters. If authentication is judged to have failed, the error code defined by MQTT is returned. If the authentication succeeds, return nil.
Subscription permission Control
// subscription acl var onSubscribe server.OnSubscribe = func(ctx context.Context, client server.Client, Req *server.SubscribeRequest) error {// Get the user name. In almost all hook functions, Username := client.clientOptions ().Username // All subscription messages from k,v := range req.subscriptions { Switch username {case "root": // If the user is root, he can subscribe to whatever he wants. Case "qos0": Req.grantqos (k, packets. qos0) case "qos1": If v.sub.qos > packets. qos1 {req.grantqos (k, packets. qos1)} case "publishonly": // Reject all subscriptions req.reject (k, &codes.Error{Code: codes.notauthorized, ErrorDetails: codes.ErrorDetails{ ReasonString: []byte("publish only"), }, }) } } return nil }Copy the code
Publishing permission Control
var onMsgArrived server.OnMsgArrived = func(ctx context.Context, client server.Client, req *server.MsgArrivedRequest) error { version := client.Version() if client.ClientOptions().Username == "subscribeonly" {switch version {case packets.Version311: // For V311, if the server does not allow the client to release a certain message, the server can only reply to a normal ACK because there is no notification mechanism. // Disconnect the client. [mqTT-33.5-2]. // We discard this packet. // client.close () return nil case packets.Version5: // For V5, an error code reply mechanism is introduced, so we can reply with an error code to tell the client that there is no permission. return &codes.Error{ Code: Codes.notauthorized,} // Or you can still be tough and close the client, However, since V5 supports the server to send Disconnect packets to the client // it is a better choice for V5 to use client.disconnect () instead of client.close () // req.drop () //client.Disconnect(&packets.Disconnect{ // Version: packets.Version5, // Code: Codes.unspecifiederror, //) //return}} if req.message. QoS == packets. Drop() return & codes.error {Code: codes.notauthorized, ErrorDetails: codes.errorDetails {ReasonString: []byte("not authorized"), UserProperties: []struct { K []byte V []byte }{ { K: []byte("user property key"), V: []byte("user property value"), }, }, }, } } return nil }Copy the code
The full code above can be found here: github.com/DrmagicE/gm…
Extension interface
Github.com/DrmagicE/gm…
Type Server interface {// Publisher enables the user to send MQTT messages to the broker Publisher() Publisher // GetConfig returns the current configuration file GetConfig() config. config // StatsManager Returns the StatsManager() StatsReader . // ClientService Provides operations such as querying clients, forcing them to go offline, and clearing sessions forcibly. ClientService() ClientService // SubscriptionService allows you to add, delete, change, or check a subscription. SubscriptionService() SubscriptionService // RetainedService() RetainedService() RetainedService }Copy the code
Gmqtt provides the above interfaces to provide extension capabilities, which are typically invoked by plug-ins. As you can see, through these extended interfaces, we can use function calls to send messages to brokers, add, delete, change, and query client connections, and so on.
Plug-in mechanism
Based on hook functions and extension interfaces, developers can write plug-ins to flexibly extend GMQTT’s capabilities. GMQTT currently comes with three built-in plug-ins: Auth authentication, Prometheus monitoring, and adminAPI management. Plugin-related interface definition: plugin.go
// HookWrapper groups all hook wrappers function
type HookWrapper struct{ OnBasicAuthWrapper OnBasicAuthWrapper OnEnhancedAuthWrapper OnEnhancedAuthWrapper OnConnectedWrapper OnConnectedWrapper OnReAuthWrapper OnReAuthWrapper OnSessionCreatedWrapper OnSessionCreatedWrapper OnSessionResumedWrapper OnSessionResumedWrapper OnSessionTerminatedWrapper OnSessionTerminatedWrapper OnSubscribeWrapper OnSubscribeWrapper OnSubscribedWrapper OnSubscribedWrapper OnUnsubscribeWrapper OnUnsubscribeWrapper OnUnsubscribedWrapper OnUnsubscribedWrapper OnMsgArrivedWrapper OnMsgArrivedWrapper OnMsgDroppedWrapper OnMsgDroppedWrapper OnDeliverWrapper OnDeliverWrapper OnCloseWrapper OnCloseWrapper OnAcceptWrapper OnAcceptWrapper OnStopWrapper OnStopWrapper }// NewPlugin is the constructor of the plug-in
type NewPlugin func(config config.Config) (Plugable, error)
// Plugin is the interface that all plug-ins need to implement
type Plugin interface {
// Load is called during server startup, and you can see how we pass the extension interface to the plug-in through this method. Enable plug-ins to call extension interfaces.
Load(service Server) error
// Unload is called when the server exits, so the plug-in can do some cleanup.
Unload() error
// HookWrapper returns the hook functions that the plug-in needs to register with the broker, or an empty structure if the plug-in does not need to register any hook functions.
HookWrapper() HookWrapper
// Name Returns the Name of the plug-in.
Name() string
}
Copy the code
For details on how to implement a plug-in, see the built-in plug-ins Admin and Prometheus before detailed plug-in documentation is available.
Session persistence is supported
Gmqtt uses memory storage by default, which is recommended by Gmqtt. Memory storage has excellent performance advantages, but the disadvantage is that session information can be lost after the broker restarts.
If you want the session not to be lost after the restart, you can configure redis persistent storage:
persistence:
type: redis # memory or redis
redis:
# redis address
addr: "127.0.0.1:6379"
The maximum number of free connections in the connection pool
max_idle: 1000
# Maximum number of active connections in the pool. 0 indicates no limit.
max_active: 0
Timeout will close the idle connection
idle_timeout: 240s
password: ""
database: 0
Copy the code
defects
- Cluster mode is not yet supported. (Next step)
If you are interested in this project, please feel free to support start.