A list,
Mica-mqtt is a simple, low latency, high performance MQTT Iot open source component based on t-IO implementation. See micA-MQTT Gitee source code micA-MQTT-Example module for details.
After several friends consulted micA-MQTT clustering I added a micA-MQTT-Broker module to demonstrate the clustering implementation based on Redis Pub/Sub.
Second, the function of
- Supports MQTT V3.1, V3.1.1 and V5.0 protocols.
- Support for websocket MQTT sub-protocols (mqTT.js is supported).
- Support for HTTP REST apis,See HTTP API documentation for details.
- Support MQTT Client client.
- Supports MQTT Server server.
- Support MQTT will message.
- MQTT retention messages are supported.
- Support for custom message (MQ) processing forwarding implementation clusters.
- MQTT client Aliyun MQTT connection to Demo.
- Support GraalVM compilation cost machine executable programs.
- Support quick access to Spring Boot projects (MICA-MQTT-spring-boot-starter).
- Mica-mqtt-spring-boot-starter connects to Prometheus + Grafana.
- Implement clustering based on Redis Pub /sub, see detailsMica – MQTT broker module.
Third, to do
- Optimized handling of MQTT sessions and support for some of the new MQTT V5.0 features.
Update records
- ✨ add micA-MQTT-Broker module to implement MQTT clustering based on Redis PUB /sub.
- ✨ micA-MQTT-Broker implements client state storage based on Redis.
- ✨ micA-MQTT-Broker implements testeate, reserved message storage based on Redis.
- ✨ MQTT-server HTTP API adjusts subscriptions and unsubscriptions to facilitate cluster processing.
- ✨ mica-mqtt-spring-boot-example Adds an example of MQTT and HTTP API authentication.
- ✨ Adds MQTT 5 all ReasonCode.
- ✨ optimized decoding PacketNeededLength calculation.
- 🐛 Fix will message, add message type.
- 🐛 Fixes mqTT-Server retention message matching rules.
Fast access to Spring Boot
5.1 Adding a Dependency
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-spring-boot-starter</artifactId>
<version>1.1.2</version>
</dependency>
Copy the code
5.2 Server Configuration Example
mqtt:
server:
enabled: true # Enable or not. Default: true
ip: 127.0. 01. The default server IP is 127.0.0.1
port: 5883 # port, default: 1883
name: Mica-Mqtt-Server # name, default: mica-mqtt-server
buffer-allocator: HEAP # heap memory and off-heap memory, default: heap memory
heartbeat-timeout: 120000 # Heartbeat timeout, in milliseconds, default: 1000 x 120
read-buffer-size: 8092 Buffer size for receiving data. Default: 8092
max-bytes-in-message: 8092 # Maximum length of bytes for message parsing. Default: 8092
debug: true Disable Prometheus indicator collection if it is enabled
websocket-enable: true Enable the websocket subprotocol by default
websocket-port: 8083 # webSocket port, default: 8083
Copy the code
5.3 Server Implementable Interface (register as Spring Bean)
interface | Whether must | instructions |
---|---|---|
IMqttServerAuthHandler | is | Used for client authentication |
IMqttMessageListener | is | News listening |
IMqttConnectStatusListener | is | Connection status monitor |
IMqttSessionManager | no | Session management |
IMqttMessageStore | Cluster yes, single-node no | Wills and retention message stores |
AbstractMqttMessageDispatcher | Cluster yes, single-node no | Message forwarding, (will, reservation message forwarding) |
IpStatListener | no | T-io IP status monitoring |
5.4 Monitoring Connection for Prometheus + Grafana
Thanks to the good design of T-IO, T-IOSTAT, which directly connects monitoring indicators, currently supports the following indicators, which will be improved in the future.
Supported index | instructions |
---|---|
mqtt_connections_accepted | Total number of connections received |
mqtt_connections_closed | Number of closed connections |
mqtt_connections_size | Current connection number |
mqtt_messages_handled_packets | Number of processed messages |
mqtt_messages_handled_bytes | Number of bytes of messages processed |
mqtt_messages_received_packets | Number of received messages |
mqtt_messages_received_bytes | Number of bytes of messages processed |
mqtt_messages_send_packets | Number of sent messages |
mqtt_messages_send_bytes | Number of sent message bytes |
For more information about micA-MQtt-spring-boot-starter, see the documentation: gitee.com/596392912/m…
Common Java project access
6.1 the maven rely on
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-core</artifactId>
<version>1.1.2</version>
</dependency>
Copy the code
6.2 MicA-MQTT client
// Initialize MQTT client
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883) // Default: 1883
.username("admin")
.password("123456")
.version(MqttVersion.MQTT_5) // Default: 3_1_1
.clientId("xxxxxx") // Default: the mica-mqtt - prefix and nanoseconds in base 36
.connect(); / / the connection
// Message subscription, similar method subxxx
client.subQos0("/test/#", (topic, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
// Unsubscribe
client.unSubscribe("/test/#");
// Send a message
client.publish("/test/client", ByteBuffer.wrap("Mica's best skin.".getBytes(StandardCharsets.UTF_8)));
// Disconnect the connection
client.disconnect();
/ / reconnection
client.reconnect();
/ / stop
client.stop();
Copy the code
6.3 MicA-MQTT Server
// Note: Add the JVM parameter -xss129k in order to accept more links (lower memory)
MqttServer mqttServer = MqttServer.create()
// Default: 127.0.0.1
.ip("127.0.0.1")
// Default: 1883
.port(1883)
// Default: 8092 (MQTT default maximum message size), can be reduced to reduce memory, if the message is too large t-IO will try to parse multiple times (recommended according to the actual service situation)
.readBufferSize(512)
// Custom authentication
.authHandler((clientId, userName, password) -> true)
// Message listener
.messageListener((clientId, topic, mqttQoS, payload) -> {
logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
})
.debug() // Enable t-IO debug information logs
.start();
// Send to a client
mqttServer.publish("clientId"."/test/123", ByteBuffer.wrap("Mica's best skin.".getBytes()));
// send to all clients listening to this topic online
mqttServer.publishAll("/test/123", ByteBuffer.wrap("Mica's best skin.".getBytes()));
// Stop the service
mqttServer.stop();
Copy the code
7. Cluster demonstration
Eight, pay attention to me
Follow the dream technology nuggets column for more wonderful articles to discover.