MQTT Java USES

This article is the first in a series of articles about learning EMQX, focusing on what MQTT is and how to get started quickly in Java.

MQTT

Message Queue Telemerty Transport (MQTT) is a binary protocol used for communication between servers and low-power IoT devices.

It sits on top of THE TCP protocol and provides some other features besides the basic publish-subscribe functionality: different delivery guarantees, “at least once” and “at most once.” Message recovery after reconnection is achieved by storing the last message acknowledged.

It is very lightweight and suitable for use in unstable network environments, both at the design and implementation level.

RabbitMq, by contrast, implements the AMQP Advanced Message Queuing Protocol (AMQP). Powerful and reliable, but not lightweight enough.

Pom depends on

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<! - actually relies on the < the dependency > < groupId > org. The eclipse paho < / groupId > < artifactId > org. The eclipse paho. Client. Mqttv3 < / artifactId > The < version > 1.2.5 < / version > < / dependency > -- >
Copy the code

Create a client to establish a link

Establish a link by creating a new client and then linking the EMQ broker address

  1. Create client, configure emQ address, clientId current service name (unique),MqttClientPersistence message during transmission, cache content
  2. Configure link options, username, password, etc…
  3. Link building
  4. Set the callback, received link successfully or not, message delivered successfully or not
  5. Subscribe to messages and configure topic and corresponding message listeners
@PostConstruct
public void createClient(a) throws MqttException {

    try {
        client = new MqttClient(mqtt.getBroker(), mqtt.getClientId(), new MemoryPersistence());

        // MQTT connection options
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setUserName(mqtt.getUsername());
        connOpts.setPassword(mqtt.getPassword().toCharArray());
        // Clear the session
        connOpts.setCleanSession(true);
        // Heartbeat interval
        connOpts.setKeepAliveInterval(180);

        // Create a link
        client.connect(connOpts);
        // Set the callback
        client.setCallback(new OnMessageCallback());

        // Subscribe to the message
        for(TopicListener topicListener : topicListeners) { client.subscribe(topicListener.getTopic(), topicListener); }}catch (MqttException me) {
        log.error("reason:{} ", me.getReasonCode());
        log.error("msg {}", me.getMessage());
        log.error("loc {}", me.getLocalizedMessage());
        log.error("cause :{}", JSONUtil.toJsonStr(me.getCause()));
        throwme; }}Copy the code
@Component
static class OnMessageCallback implements MqttCallback {

    @Override
    public void connectionLost(Throwable cause) {
        // Connection failed
        log.error("Connection down, cause: {}", cause.getMessage(), cause);
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) {
        // subscribe the message is executed here
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("Message delivered successfully deliveryComplete---------"+ token.isComplete()); }}Copy the code

Subscribe to news

Methods a

Subscribe (“test.topic”) to client. Subscribe (“test.topic”)

In MqttCallback. MessageArrived (topic, message) in the subscription to the message

Method 2

The use of the client. The subscribe (” test. The topic, “IMqttMessageListener)

This way for each topic listener configuration, realize IMqttMessageListener. MessageArrived is (topic, message) news subscription

It is recommended to use the second method. After the business logic is complex, the listener is implemented for a single topic, which is clearer

Ps: Callback and listener are called respectively in CommsCallback

CommsCallback.java
protected boolean deliverMessage(String topicName, int messageId, MqttMessage aMessage) throws Exception{		
    boolean delivered = false;

    Enumeration<String> keys = callbacks.keys(); // Get the topic subscription list
    while (keys.hasMoreElements()) {
        String topicFilter = (String)keys.nextElement();
        // callback may already have been removed in the meantime, so a null check is necessary
        // Topic corresponds to messageListener
        IMqttMessageListener callback = callbacks.get(topicFilter); 
        if(callback == null) {
           continue;
        }
        // Matching topic, topic matching rules (wildcards etc...)
        if (MqttTopic.isMatched(topicFilter, topicName)) { 
            aMessage.setId(messageId);
            / / match is invoked IMqttMessageListener. MessageArrived ()
            ((IMqttMessageListener)callback).messageArrived(topicName, aMessage);
            delivered = true; }}/* if the message hasn't been delivered to a per subscription handler, give it to the default handler */
    if(mqttCallback ! =null && !delivered) {
        aMessage.setId(messageId);
        // Topic does not match listener, call callback.messagearrived ()
        mqttCallback.messageArrived(topicName, aMessage);
        delivered = true;
    }

    return delivered;
}
Copy the code

Rules of the topic

Topic names can contain wildcards, single-level wildcards “+” and multi-level wildcards “#”. You can subscribe to all topics that meet the matching criteria by using a topic name that contains a wildcard. To distinguish it from the Topic in PUBLISH, we call the Topic in SUBSCRIBE Topic Filter.

  • Single-layer wildcard “+” : “+” can be used to refer to any level. Such as sensor / + / tem,

    Can match:

    • sensor/data/tem
    • sensor/cmd/tem

    Can’t match

    • sensor/data/01/tem
  • Multi-layer wildcard “#” : The difference between “#” and “+” is that “#” can be used to refer to any number of layers. ** But “#” must be the last character in the Topic Filter and must be followed by “/”, unless the Topic Filter contains only one “#” character. ** For example “#” is a legitimate Topic Filter, while “sensor#” is not a legitimate Topic Filter.

    Such as sensor/data / # “,

    Can match:

    • sensor/data
    • sensor/data/tem
    • sensor/data/tem/01
    • ensor/data/tem/01/02

    Cannot match:

    • sensor/cmd/tem

Mqtttopic.ismatched (topicFilter,topicName). It’s too long to stick.

Send a message

@Resource
private MqttClient client;

public void send(@RequestParam String message) {
    // Message content
    MqttMessage mqttMessage = new MqttMessage(message.getBytes());
    // Message qos mode
    mqttMessage.setQos(2);
    try {
        // Push a message to a topic
        client.publish("/test", mqttMessage);
    } catch(MqttException e) { e.printStackTrace(); log.info(e.getMessage()); }}Copy the code

Qos – Quality of service Indicates the quality of service to be used. Set this parameter to 0, 1, and 2.

Qos :0 – Indicates that the message can be delivered at most once (zero or once). Messages will not be persisted to disk and will not be acknowledged over the network. This QoS is the fastest, but only for worthless messages. If the server cannot process the message (authorization problems, for example), will send complete direct display, to callback the interface MqttCallback. DeliveryComplete (IMqttDeliveryToken). Also known as “fire and forget,” this mode means send and forget.

Qos :1 – Indicates that the message should be delivered at least once (once or more). Messages can only be delivered safely if they can be persisted, so the application must provide persistence methods using MqttConnectOptions. If a persistence mechanism is not specified, messages are not delivered in the event of a client failure. The message will be confirmed over the network. This is the default QoS.

Qos :2- Indicates that the message should be delivered once. The message will be saved to disk and will be validated in two stages over the network. Messages can only be delivered safely if they can be persisted, so the application must provide persistence methods using MqttConnectOptions. If a persistence mechanism is not specified, messages are not delivered in the event of a client failure. If persistence is not configured, QoS 1 and QoS 2 messages will still be delivered in the event of a network or server problem, because the client will remain state in memory. If the MQTT client shuts down or fails and persistence is not configured, the delivery of QoS 1 and 2 messages cannot be maintained because the client state will be lost.

Reference documentation

Emq official document