Why MQTT

MQTT definition believe a lot of people can be explained, this article does not discuss what lofty things, aimed at using the most simple and intuitive way to let each new contact with the peers can be the fastest application

Let’s start with what you need to use MQTT:

  1. Message server
  2. Frequent interactions between different applications/devices
  3. One-to-many messaging may be involved

Based on the three points outlined above, we can probably see that MQTT is best suited to scenarios where messages are an important part of the system and participate in the business-critical logic of the system

MQTT, start!

Having decided to use it, the first thing we need to look at is how to get MQTT to work. After all, it’s not as simple as adding a dependency to Maven

There are two things we need to do:

  1. Download the EMQX message server as broker
  2. Introduce dependencies into Maven
<dependency>  
    <groupId>org.springframework.integration</groupId>  
    <artifactId>spring-integration-mqtt</artifactId>  
    <version>5.3.2. RELEASE</version>  
</dependency>
Copy the code

After completing the above two steps, start the EMQX server and officially enter our MQTT journey

use

There are two ways to use MQTT code in Spring Boot:

  1. Use the message channel concept of Spring-Integration
  2. Use the traditional Client Client concept

The first is mentally taxing, but much more convenient than the latter, after the author successfully teamed up (copy + build wheels) with automatic registration

Before we get into the code, let’s take a look at some of the most common concepts in use:

  • Topic: The main propagation route of MQTT messages, we publish to the topic, subscribe to the topic, read the message from the topic and do business logic processing, the topic is the channel of the message
  • Producer: The sender of the MQTT message, who sends the message to the subject
  • Consumers: Recipients of MQTT messages who subscribe to the topics they need and get messages from them
  • Broker: The message forwarder through which messages are carried. EMQX is our broker and we do not care about its implementation

In fact, the use process of MQTT is as follows: the producer sends a message to the topic -> the broker delivers the message -> the consumers who subscribe to the topic get the message and perform the corresponding business logic

Client mode

This pattern and the traditional database link,Redis link is basically the same, have the development experience of small partners can be very easy to control, we need to consider if we create the corresponding factory, is it a singleton pattern, or prototype, or build a pool?

We use the singleton pattern for this introduction

Creating a factory class

First, let’s create a factory.

public class MqttFactory private static MqttProperties configuration;  
    
    private static MqttClient client;  
 
    /** * Get the client instance * singleton mode, return if it exists, initialize if it does not */
    public static MqttClient getInstance(a) {    
        if (client == null) {      
            init();    
        }    
        return client;  
    }  
    
    /** * Initialize the client */
    public static void init(a) {    
        try {      
            client = new MqttClient(configuration.getAddress(), "client-" + System.currentTimeMillis());      
            // MQTT configures the object
            MqttConnectOptions options = new MqttConnectOptions();      
            // Set automatic reconnection. For other parameters, see MqttConnectOptions
            options.setAutomaticReconnect(true);      
            if (!client.isConnected()) {        
            client.connect(options);      
            }    
        } catch (MqttException e) {      
            LOGGER.error(String.format(MQTT: Failed to connect to message server [%s], configuration.getAddress())); }}}Copy the code

For the specific configuration of MQTT, see MqttConnectOptions, which will not be explained here

More than a word, the document is always more powerful than some blogs!!

Create a tool class

Next, we create MqttUtil, which is used to send messages and subscribe to topics

public class MqttUtil /** * Send message *@paramThe topic theme *@paramData Message content */
    public static void send(String topic, Object data) {    
        // Get the client instance
        MqttClient client = MqttFactory.getInstance();    
        ObjectMapper mapper = new ObjectMapper();    
        try {
            // Convert the message to a JSON string
            String json = mapper.writeValueAsString(data);      
            client.publish(topic, new MqttMessage(json.getBytes(StandardCharsets.UTF_8)));    
        } catch (JsonProcessingException e) {      
            LOGGER.error(String.format(MQTT: Subject [%s] failed to send message to convert JSON, topic));    
        } catch (MqttException e) {      
            LOGGER.error(String.format(MQTT: Subject [%s] failed to send message, topic)); }}/** * Subscribe to topics *@paramThe topic theme *@paramListener Message listener processor */
    public static void subscribe(String topic, IMqttMessageListener listener) {  
        MqttClient client = MqttFactory.getInstance();  
        try {    
            client.subscribe(topic, listener);  
        } catch (MqttException e) {    
            LOGGER.error(String.format(MQTT: Failed to subscribe to topic [%s], topic)); }}}Copy the code

IMqttMessageListener (); IMqttMessageListener (); IMqttMessageListener ()

public class MessageListener implements IMqttMessageListener /** * Processing messages *@paramThe topic theme *@paramMqttMessage news * /
    @Override  
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {   
        LOGGER.info(String.format("MQTT: Subscription topic [%s] sent message [%s]", topic, new String(mqttMessage.getPayload())));  
    }
    
    public static void main(String[] args) // Subscribe to topic test01 and process its messages using a MessageListener
        MqttUtil.subscribe("test01".newMessageListener()); }}Copy the code

Is it easy to understand whether it’s sending or subscribing?

The end of comfort brings endless torture and emptiness, so let’s challenge the second mode of mental burden!

Spring Integration

What is Spring Integration? I’m sorry. I don’t know. I don’t want to know

Why Use Spring Integration? Because it’s really easy to maintain

Most of the online tutorials are for Spring Integration, which is probably the first time I’ve seen them, so I chose to give them up and choose the automatic configuration method of God, and on the basis of it, I adjusted accordingly for the mental burden

Remember that concept we talked about before? Topic/producer/consumer

In Spring Integration, we added a few new concepts and tweaked the previous ones:

  • Channel: The conduit through which messages are transmitted and received, through which every message is drilled
  • Client factory: Used to create MQTT clients, similar to pattern 1
  • Message adapter: Used to receive MQTT messages for transformation, but not to participate in the business logic
  • Inbound channel: With the message adapter, the channel through which messages enter the platform
  • Outbound channel: the channel in conjunction with the client factory, the messaging platform
  • Theme: It’s still the theme, it doesn’t change
  • Producer: The guy who owns the outbound channel
  • Consumer: The guy with the inbound channel

If the above definition is understood over time, the process of this pattern can actually look like this:

  • Producer: Create the outbound channel for the specified client factory -> Send message
  • Consumer: Create the inbound channel for the specified message adapter -> Receive Message -> Enter Message Interceptor -> Business logic

In my opinion, this is in line with the Spring Boot philosophy of convention over configuration

The content code of this block is relatively complex, and will be explained in detail later

For the code, see the Spring Boot Koala that you use for your practice

conclusion

MQTT, as a messaging service, meets most of our development needs, but there are some remaining issues that I haven’t thought about and practiced:

  • How to use the qos mechanism to prevent data loss
  • Queue and sort messages
  • Application in cluster mode