One, foreword

After more than a month of livers, micA-MQTT now has a more stable version. There were 7 official releases and more than 100 submissions in just over a month. I would like to thank those students for their concerns, star, usage and feedback.

Second, the introduction of

Mica-mqtt is a simple, low latency, high performance MQTT Iot open source component based on t-IO implementation. See micA-MQTT Gitee source code micA-MQTT-Example module for details.

Three, functionality,

  • Supports MQTT V3.1, V3.1.1 and V5.0 protocols.
  • Support for websocket MQTT sub-protocols (mqTT.js is supported).
  • Support for HTTP REST apis,See HTTP API documentation for details.
  • Support MQTT Client client.
  • Supports MQTT Server server.
  • Support MQTT will message.
  • MQTT retention messages are supported.
  • Support for custom message (MQ) processing forwarding implementation clusters.
  • MQTT client Aliyun MQTT connection to Demo.
  • Support GraalVM compilation cost machine executable programs.
  • Support quick access to Spring Boot projects (MICA-MQTT-spring-boot-starter).
  • Mica-mqtt-spring-boot-starter connects to Prometheus + Grafana.

Update records

  • :sparkles: MQTT-Server optimized connection closure log.
  • : Sparkles: MQTT-Server optimizes subscription. The same topicFilter subscription determines qos.
  • :sparkles: Add a try catch to the MQTT-server listener to avoid service disconnection.
  • :sparkles: MQTT-Server optimizes topicFilters validation.
  • : Sparkles: MQTT-Client optimizes subscription reasonCodes judgment.
  • :sparkles: Add a try catch to the MQTT-client listener to avoid service disconnection.
  • :sparkles: mqtt-client Adds the session validity period.
  • : Sparkles: Code optimizations to reduce problems on Codacy.
  • : Bug: MQTT-Server fixes heartbeat time problems.
  • : Bug: Fixed duplicate messages when multiple mqTT-Server subscriptions match at the same time.
  • : Bug: MQTT-client optimizes connection processing logic and subscribes after MQTT connection.
  • : Bug: Fixed a potentially null pointer to MqttProperties.

Fast access to Spring Boot

5.1 Adding a Dependency

<dependency>
    <groupId>net.dreamlu</groupId>
    <artifactId>mica-mqtt-spring-boot-starter</artifactId>
    <version>1.1.1</version>
</dependency>
Copy the code

5.2 Server Configuration Example

mqtt:
  server:
    enabled: true               # Enable or not. Default: true
    ip: 127.0. 01.               The default server IP is 127.0.0.1
    port: 5883                  # port, default: 1883
    name: Mica-Mqtt-Server      # name, default: mica-mqtt-server
    buffer-allocator: HEAP      # heap memory and off-heap memory, default: heap memory
    heartbeat-timeout: 120000   # Heartbeat timeout, in milliseconds, default: 1000 x 120
    read-buffer-size: 8092      Buffer size for receiving data. Default: 8092
    max-bytes-in-message: 8092  # Maximum length of bytes for message parsing. Default: 8092
    debug: true                 Disable Prometheus indicator collection if it is enabled
    websocket-enable: true      Enable the websocket subprotocol by default
    websocket-port: 8083        # webSocket port, default: 8083
Copy the code

5.3 Server Implementable Interface (register as Spring Bean)

interface Whether must instructions
IMqttServerAuthHandler is Used for client authentication
IMqttMessageListener is News listening
IMqttConnectStatusListener is Connection status monitor
IMqttSessionManager no Session management
IMqttMessageStore Cluster yes, single-node no Wills and retention message stores
AbstractMqttMessageDispatcher Cluster yes, single-node no Message forwarding, (will, reservation message forwarding)
IpStatListener no T-io IP status monitoring

5.4 (Optional) User-defined Server Configuration

@Configuration(proxyBeanMethods = false)
public class MqttServerCustomizerConfiguration {

	@Bean
	public MqttServerCustomizer activeRecordPluginCustomizer(a) {
		return new MqttServerCustomizer() {
			@Override
			public void customize(MqttServerCreator creator) {
				// You can customize the configuration creator here, which overwrites the configuration in YML
				System.out.println("----------------MqttServerCustomizer-----------------"); }}; }}Copy the code

5.5 MqttServerTemplate Example

import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.ByteBuffer;

/ * * *@author wsq
 */
@Service
public class ServerService {
    @Autowired
    private MqttServerTemplate server;

    public boolean publish(String body) {
        server.publishAll("/test/123", ByteBuffer.wrap(body.getBytes()));
        return true; }}Copy the code

5.6 Cluster processing based on MQ message broadcast

  • implementationIMqttConnectStatusListenerHandles device state storage.
  • implementationIMqttMessageListenerThe message is forwarded to MQ, and the business processes the MQ message as needed.
  • implementationIMqttMessageStoreStore wills and keep messages.
  • implementationAbstractMqttMessageDispatcherThe message is sent to MQ, which broadcasts back to the MQTT cluster, and the MQTT sends the message to the device.
  • Business messages are sent to MQ, MQ broadcasts to the MQTT cluster, and the MQTT sends the message to the device.

5.7 Monitoring connection for Prometheus + Grafana

Thanks to the good design of T-IO, T-IOSTAT, which directly connects monitoring indicators, currently supports the following indicators, which will be improved in the future.

Supported index instructions
mqtt_connections_accepted Total number of connections received
mqtt_connections_closed Number of closed connections
mqtt_connections_size Current connection number
mqtt_messages_handled_packets Number of processed messages
mqtt_messages_handled_bytes Number of bytes of messages processed
mqtt_messages_received_packets Number of received messages
mqtt_messages_received_bytes Number of bytes of messages processed
mqtt_messages_send_packets Number of sent messages
mqtt_messages_send_bytes Number of sent message bytes

For more information about micA-MQtt-spring-boot-starter, see the documentation: gitee.com/596392912/m…

Common Java project access

6.1 the maven rely on

 <dependency>
   <groupId>net.dreamlu</groupId>
   <artifactId>mica-mqtt-core</artifactId>
   <version>1.1.1</version>
 </dependency>
Copy the code

6.2 MicA-MQTT client

 // Initialize MQTT client
 MqttClient client = MqttClient.create()
     .ip("127.0.0.1")
     .port(1883)                     // Default: 1883
     .username("admin")
     .password("123456")
     .version(MqttVersion.MQTT_5)    // Default: 3_1_1
     .clientId("xxxxxx")             // Default: the mica-mqtt - prefix and nanoseconds in base 36
     .connect();                     / / the connection
 
     // Message subscription, similar method subxxx
     client.subQos0("/test/#", (topic, payload) -> {
         logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
     });
     // Unsubscribe
     client.unSubscribe("/test/#");
 
     // Send a message
     client.publish("/test/client", ByteBuffer.wrap("Mica's best skin.".getBytes(StandardCharsets.UTF_8)));
 
     // Disconnect the connection
     client.disconnect();
     / / reconnection
     client.reconnect();
     / / stop
     client.stop();
Copy the code

6.3 MicA-MQTT Server

// Note: Add the JVM parameter -xss129k in order to accept more links (lower memory)
MqttServer mqttServer = MqttServer.create()
 // Default: 127.0.0.1
 .ip("127.0.0.1")
 // Default: 1883
 .port(1883)
 // Default: 8092 (MQTT default maximum message size), can be reduced to reduce memory, if the message is too large t-IO will try to parse multiple times (recommended according to the actual service situation)
 .readBufferSize(512)
 // Custom authentication
 .authHandler((clientId, userName, password) -> true)
 // Message listener
 .messageListener((clientId, topic, mqttQoS, payload) -> {
	 logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
 })
 / / the SSL configuration
 .useSsl("".""."")
 // Customize client up-down and down-line listening
 .connectStatusListener(new IMqttConnectStatusListener() {
	 @Override
	 public void online(String clientId) {}@Override
	 public void offline(String clientId) {}})// Custom message forwarding, which can be clustered using MQ broadcast
 .messageDispatcher(new IMqttMessageDispatcher() {
	 @Override
	 public void config(MqttServer mqttServer) {}@Override
	 public boolean send(Message message) {
		 return false;
	 }

	 @Override
	 public boolean send(String clientId, Message message) {
		 return false;
	 }
 })
 .debug() // Enable t-IO debug information logs
 .start();

// Send to a client
mqttServer.publish("clientId"."/test/123", ByteBuffer.wrap("Mica's best skin.".getBytes()));

// send to all clients listening to this topic online
mqttServer.publishAll("/test/123", ByteBuffer.wrap("Mica's best skin.".getBytes()));

// Stop the service
mqttServer.stop();
Copy the code

7. Effect demonstration

Eight, pay attention to me

Follow the dream technology nuggets column for more wonderful articles to discover.