A list,

Mica-mqtt is a simple, low latency, high performance MQTT Iot open source component based on t-IO implementation. See micA-MQTT Gitee source code micA-MQTT-Example module for details.

After several friends consulted micA-MQTT clustering I added a micA-MQTT-Broker module to demonstrate the clustering implementation based on Redis Pub/Sub.

Second, the function of

  • Supports MQTT V3.1, V3.1.1 and V5.0 protocols.
  • Support for websocket MQTT sub-protocols (mqTT.js is supported).
  • Support for HTTP REST apis,See HTTP API documentation for details.
  • Support MQTT Client client.
  • Supports MQTT Server server.
  • Support MQTT will message.
  • MQTT retention messages are supported.
  • Support for custom message (MQ) processing forwarding implementation clusters.
  • MQTT client Aliyun MQTT connection to Demo.
  • Support GraalVM compilation cost machine executable programs.
  • Support quick access to Spring Boot projects (MICA-MQTT-spring-boot-starter).
  • Mica-mqtt-spring-boot-starter connects to Prometheus + Grafana.
  • Implement clustering based on Redis Pub /sub, see detailsMica – MQTT broker module.

Third, to do

  • Optimized handling of MQTT sessions and support for some of the new MQTT V5.0 features.

Update records

  • ✨ add micA-MQTT-Broker module to implement MQTT clustering based on Redis PUB /sub.
  • ✨ micA-MQTT-Broker implements client state storage based on Redis.
  • ✨ micA-MQTT-Broker implements testeate, reserved message storage based on Redis.
  • ✨ MQTT-server HTTP API adjusts subscriptions and unsubscriptions to facilitate cluster processing.
  • ✨ mica-mqtt-spring-boot-example Adds an example of MQTT and HTTP API authentication.
  • ✨ Adds MQTT 5 all ReasonCode.
  • ✨ optimized decoding PacketNeededLength calculation.
  • 🐛 Fix will message, add message type.
  • 🐛 Fixes mqTT-Server retention message matching rules.

Fast access to Spring Boot

5.1 Adding a Dependency

<dependency>
    <groupId>net.dreamlu</groupId>
    <artifactId>mica-mqtt-spring-boot-starter</artifactId>
    <version>1.1.2</version>
</dependency>
Copy the code

5.2 Server Configuration Example

mqtt:
  server:
    enabled: true               # Enable or not. Default: true
    ip: 127.0. 01.               The default server IP is 127.0.0.1
    port: 5883                  # port, default: 1883
    name: Mica-Mqtt-Server      # name, default: mica-mqtt-server
    buffer-allocator: HEAP      # heap memory and off-heap memory, default: heap memory
    heartbeat-timeout: 120000   # Heartbeat timeout, in milliseconds, default: 1000 x 120
    read-buffer-size: 8092      Buffer size for receiving data. Default: 8092
    max-bytes-in-message: 8092  # Maximum length of bytes for message parsing. Default: 8092
    debug: true                 Disable Prometheus indicator collection if it is enabled
    websocket-enable: true      Enable the websocket subprotocol by default
    websocket-port: 8083        # webSocket port, default: 8083
Copy the code

5.3 Server Implementable Interface (register as Spring Bean)

interface Whether must instructions
IMqttServerAuthHandler is Used for client authentication
IMqttMessageListener is News listening
IMqttConnectStatusListener is Connection status monitor
IMqttSessionManager no Session management
IMqttMessageStore Cluster yes, single-node no Wills and retention message stores
AbstractMqttMessageDispatcher Cluster yes, single-node no Message forwarding, (will, reservation message forwarding)
IpStatListener no T-io IP status monitoring

5.4 Monitoring Connection for Prometheus + Grafana

Thanks to the good design of T-IO, T-IOSTAT, which directly connects monitoring indicators, currently supports the following indicators, which will be improved in the future.

Supported index instructions
mqtt_connections_accepted Total number of connections received
mqtt_connections_closed Number of closed connections
mqtt_connections_size Current connection number
mqtt_messages_handled_packets Number of processed messages
mqtt_messages_handled_bytes Number of bytes of messages processed
mqtt_messages_received_packets Number of received messages
mqtt_messages_received_bytes Number of bytes of messages processed
mqtt_messages_send_packets Number of sent messages
mqtt_messages_send_bytes Number of sent message bytes

For more information about micA-MQtt-spring-boot-starter, see the documentation: gitee.com/596392912/m…

Common Java project access

6.1 the maven rely on

 <dependency>
   <groupId>net.dreamlu</groupId>
   <artifactId>mica-mqtt-core</artifactId>
   <version>1.1.2</version>
 </dependency>
Copy the code

6.2 MicA-MQTT client

 // Initialize MQTT client
 MqttClient client = MqttClient.create()
     .ip("127.0.0.1")
     .port(1883)                     // Default: 1883
     .username("admin")
     .password("123456")
     .version(MqttVersion.MQTT_5)    // Default: 3_1_1
     .clientId("xxxxxx")             // Default: the mica-mqtt - prefix and nanoseconds in base 36
     .connect();                     / / the connection
 
     // Message subscription, similar method subxxx
     client.subQos0("/test/#", (topic, payload) -> {
         logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
     });
     // Unsubscribe
     client.unSubscribe("/test/#");
 
     // Send a message
     client.publish("/test/client", ByteBuffer.wrap("Mica's best skin.".getBytes(StandardCharsets.UTF_8)));
 
     // Disconnect the connection
     client.disconnect();
     / / reconnection
     client.reconnect();
     / / stop
     client.stop();
Copy the code

6.3 MicA-MQTT Server

 // Note: Add the JVM parameter -xss129k in order to accept more links (lower memory)
 MqttServer mqttServer = MqttServer.create()
     // Default: 127.0.0.1
     .ip("127.0.0.1")
     // Default: 1883
     .port(1883)
     // Default: 8092 (MQTT default maximum message size), can be reduced to reduce memory, if the message is too large t-IO will try to parse multiple times (recommended according to the actual service situation)
     .readBufferSize(512)
     // Custom authentication
     .authHandler((clientId, userName, password) -> true)
     // Message listener
     .messageListener((clientId, topic, mqttQoS, payload) -> {
         logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
     })
     .debug() // Enable t-IO debug information logs
     .start();
 
 // Send to a client
 mqttServer.publish("clientId"."/test/123", ByteBuffer.wrap("Mica's best skin.".getBytes()));
 
 // send to all clients listening to this topic online
 mqttServer.publishAll("/test/123", ByteBuffer.wrap("Mica's best skin.".getBytes()));
 
 // Stop the service
 mqttServer.stop();
Copy the code

7. Cluster demonstration

Eight, pay attention to me

Follow the dream technology nuggets column for more wonderful articles to discover.