One, foreword
After more than a month of livers, micA-MQTT now has a more stable version. There were 7 official releases and more than 100 submissions in just over a month. I would like to thank those students for their concerns, star, usage and feedback.
Second, the introduction of
Mica-mqtt is a simple, low latency, high performance MQTT Iot open source component based on t-IO implementation. See micA-MQTT Gitee source code micA-MQTT-Example module for details.
Three, functionality,
- Supports MQTT V3.1, V3.1.1 and V5.0 protocols.
- Support for websocket MQTT sub-protocols (mqTT.js is supported).
- Support for HTTP REST apis,See HTTP API documentation for details.
- Support MQTT Client client.
- Supports MQTT Server server.
- Support MQTT will message.
- MQTT retention messages are supported.
- Support for custom message (MQ) processing forwarding implementation clusters.
- MQTT client Aliyun MQTT connection to Demo.
- Support GraalVM compilation cost machine executable programs.
- Support quick access to Spring Boot projects (MICA-MQTT-spring-boot-starter).
- Mica-mqtt-spring-boot-starter connects to Prometheus + Grafana.
Update records
- :sparkles: MQTT-Server optimized connection closure log.
- : Sparkles: MQTT-Server optimizes subscription. The same topicFilter subscription determines qos.
- :sparkles: Add a try catch to the MQTT-server listener to avoid service disconnection.
- :sparkles: MQTT-Server optimizes topicFilters validation.
- : Sparkles: MQTT-Client optimizes subscription reasonCodes judgment.
- :sparkles: Add a try catch to the MQTT-client listener to avoid service disconnection.
- :sparkles: mqtt-client Adds the session validity period.
- : Sparkles: Code optimizations to reduce problems on Codacy.
- : Bug: MQTT-Server fixes heartbeat time problems.
- : Bug: Fixed duplicate messages when multiple mqTT-Server subscriptions match at the same time.
- : Bug: MQTT-client optimizes connection processing logic and subscribes after MQTT connection.
- : Bug: Fixed a potentially null pointer to MqttProperties.
Fast access to Spring Boot
5.1 Adding a Dependency
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>
Copy the code
5.2 Server Configuration Example
mqtt:
server:
enabled: true # Enable or not. Default: true
ip: 127.0. 01. The default server IP is 127.0.0.1
port: 5883 # port, default: 1883
name: Mica-Mqtt-Server # name, default: mica-mqtt-server
buffer-allocator: HEAP # heap memory and off-heap memory, default: heap memory
heartbeat-timeout: 120000 # Heartbeat timeout, in milliseconds, default: 1000 x 120
read-buffer-size: 8092 Buffer size for receiving data. Default: 8092
max-bytes-in-message: 8092 # Maximum length of bytes for message parsing. Default: 8092
debug: true Disable Prometheus indicator collection if it is enabled
websocket-enable: true Enable the websocket subprotocol by default
websocket-port: 8083 # webSocket port, default: 8083
Copy the code
5.3 Server Implementable Interface (register as Spring Bean)
interface | Whether must | instructions |
---|---|---|
IMqttServerAuthHandler | is | Used for client authentication |
IMqttMessageListener | is | News listening |
IMqttConnectStatusListener | is | Connection status monitor |
IMqttSessionManager | no | Session management |
IMqttMessageStore | Cluster yes, single-node no | Wills and retention message stores |
AbstractMqttMessageDispatcher | Cluster yes, single-node no | Message forwarding, (will, reservation message forwarding) |
IpStatListener | no | T-io IP status monitoring |
5.4 (Optional) User-defined Server Configuration
@Configuration(proxyBeanMethods = false)
public class MqttServerCustomizerConfiguration {
@Bean
public MqttServerCustomizer activeRecordPluginCustomizer(a) {
return new MqttServerCustomizer() {
@Override
public void customize(MqttServerCreator creator) {
// You can customize the configuration creator here, which overwrites the configuration in YML
System.out.println("----------------MqttServerCustomizer-----------------"); }}; }}Copy the code
5.5 MqttServerTemplate Example
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
/ * * *@author wsq
*/
@Service
public class ServerService {
@Autowired
private MqttServerTemplate server;
public boolean publish(String body) {
server.publishAll("/test/123", ByteBuffer.wrap(body.getBytes()));
return true; }}Copy the code
5.6 Cluster processing based on MQ message broadcast
- implementation
IMqttConnectStatusListener
Handles device state storage. - implementation
IMqttMessageListener
The message is forwarded to MQ, and the business processes the MQ message as needed. - implementation
IMqttMessageStore
Store wills and keep messages. - implementation
AbstractMqttMessageDispatcher
The message is sent to MQ, which broadcasts back to the MQTT cluster, and the MQTT sends the message to the device. - Business messages are sent to MQ, MQ broadcasts to the MQTT cluster, and the MQTT sends the message to the device.
5.7 Monitoring connection for Prometheus + Grafana
Thanks to the good design of T-IO, T-IOSTAT, which directly connects monitoring indicators, currently supports the following indicators, which will be improved in the future.
Supported index | instructions |
---|---|
mqtt_connections_accepted | Total number of connections received |
mqtt_connections_closed | Number of closed connections |
mqtt_connections_size | Current connection number |
mqtt_messages_handled_packets | Number of processed messages |
mqtt_messages_handled_bytes | Number of bytes of messages processed |
mqtt_messages_received_packets | Number of received messages |
mqtt_messages_received_bytes | Number of bytes of messages processed |
mqtt_messages_send_packets | Number of sent messages |
mqtt_messages_send_bytes | Number of sent message bytes |
For more information about micA-MQtt-spring-boot-starter, see the documentation: gitee.com/596392912/m…
Common Java project access
6.1 the maven rely on
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-core</artifactId>
<version>1.1.1</version>
</dependency>
Copy the code
6.2 MicA-MQTT client
// Initialize MQTT client
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883) // Default: 1883
.username("admin")
.password("123456")
.version(MqttVersion.MQTT_5) // Default: 3_1_1
.clientId("xxxxxx") // Default: the mica-mqtt - prefix and nanoseconds in base 36
.connect(); / / the connection
// Message subscription, similar method subxxx
client.subQos0("/test/#", (topic, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
// Unsubscribe
client.unSubscribe("/test/#");
// Send a message
client.publish("/test/client", ByteBuffer.wrap("Mica's best skin.".getBytes(StandardCharsets.UTF_8)));
// Disconnect the connection
client.disconnect();
/ / reconnection
client.reconnect();
/ / stop
client.stop();
Copy the code
6.3 MicA-MQTT Server
// Note: Add the JVM parameter -xss129k in order to accept more links (lower memory)
MqttServer mqttServer = MqttServer.create()
// Default: 127.0.0.1
.ip("127.0.0.1")
// Default: 1883
.port(1883)
// Default: 8092 (MQTT default maximum message size), can be reduced to reduce memory, if the message is too large t-IO will try to parse multiple times (recommended according to the actual service situation)
.readBufferSize(512)
// Custom authentication
.authHandler((clientId, userName, password) -> true)
// Message listener
.messageListener((clientId, topic, mqttQoS, payload) -> {
logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
})
/ / the SSL configuration
.useSsl("".""."")
// Customize client up-down and down-line listening
.connectStatusListener(new IMqttConnectStatusListener() {
@Override
public void online(String clientId) {}@Override
public void offline(String clientId) {}})// Custom message forwarding, which can be clustered using MQ broadcast
.messageDispatcher(new IMqttMessageDispatcher() {
@Override
public void config(MqttServer mqttServer) {}@Override
public boolean send(Message message) {
return false;
}
@Override
public boolean send(String clientId, Message message) {
return false;
}
})
.debug() // Enable t-IO debug information logs
.start();
// Send to a client
mqttServer.publish("clientId"."/test/123", ByteBuffer.wrap("Mica's best skin.".getBytes()));
// send to all clients listening to this topic online
mqttServer.publishAll("/test/123", ByteBuffer.wrap("Mica's best skin.".getBytes()));
// Stop the service
mqttServer.stop();
Copy the code
7. Effect demonstration
Eight, pay attention to me
Follow the dream technology nuggets column for more wonderful articles to discover.