This article follows the previous article on the Internet of Things protocol MQTT source code analysis (a) and wrote the second ARTICLE MQTT published messages and received Broker messages source analysis, want to see MQTT connection partners can go to see my last article.

Juejin. Cn/post / 684490…

MQTT publishes messages

The MQTT publish message is performed by the PUBLISH function of the MqttAndroidClient class. Let’s look at this function:

/ / MqttAndroidClient class:  @Override public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, boolean retained, Object userContext, IMqttActionListener callback) throws MqttException, MqttPersistenceException {// Encapsulate the message content, qos message level, and retained message as MqttMessage. MqttMessage message = new MqttMessage(payload);  message.setQos(qos); message.setRetained(retained); // Each message has its own token MqttDeliveryTokenAndroid token = new MqttDeliveryTokenAndroid(this, userContext, callback, message);  String activityToken = storeToken(token); IMqttDeliveryToken internalToken = mqttService.publish(clientHandle, topic, payload, qos, retained, null, activityToken); token.setDelegate(internalToken);return token;
    }
Copy the code

Publish (mqttService.publish); publish (mqttService.publish);

/ / MqttService class: Public IMqttDeliveryToken publish(String clientHandle, String topic, byte[] payload, int qos, boolean retained, String invocationContext, String activityToken) throws MqttPersistenceException, MqttException { MqttConnection client = getConnection(clientHandle);return client.publish(topic, payload, qos, retained, invocationContext,
                activityToken);
    }
Copy the code

MqttConnection As explained in the previous article, the connection to an MQTT initializes an MqttConnection, which is stored in a Map collection called Connections and retrieved through the getConnection(clientHandle) method. The client. Publish function is obvious:

/ / MqttConnection class:  public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, boolean retained, String invocationContext, String activityToken) {final Bundle resultBundle = new Bundle(); final Bundle resultBundle = new Bundle(); resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.SEND_ACTION); resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN, activityToken); resultBundle.putString( MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT, invocationContext); IMqttDeliveryToken sendToken = null;if((myClient ! = null) && (myClient.isConnected())) {// Carry resultBundle data, IMqttActionListener Listener = new MqttConnectionListener(resultBundle); try { MqttMessage message = new MqttMessage(payload); message.setQos(qos); message.setRetained(retained); sendToken = myClient.publish(topic, payload, qos, retained, invocationContext, listener); storeSendDetails(topic, message, sendToken, invocationContext, activityToken); } catch (Exception e) { handleException(resultBundle, e); }}else {
            resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE,
                    NOT_CONNECTED);
            service.traceError(MqttServiceConstants.SEND_ACTION, NOT_CONNECTED);
            service.callbackToActivity(clientHandle, Status.ERROR, resultBundle);
        }

        return sendToken;
    }
Copy the code

Publish myClient: publish myClient: publish myClient: publish myClient: publish myClient: publish myClient: publish myClient MyClient is MqttAsyncClient, which is initialized in the CONNECT method of the MqttConnection class when an MQTT connection is made, as described in the previous article.

/ / MqttAsyncClient class:  public IMqttDeliveryToken publish(String topic, byte[] payload, int qos , boolean retained,Object userContext, IMqttActionListener callback) throws MqttException,MqttPersistenceException { MqttMessage message = new MqttMessage(payload); message.setQos(qos); message.setRetained(retained);return this.publish(topic, message, userContext, callback);
    }
    
    public IMqttDeliveryToken publish(String topic, MqttMessage message
        , Object userContext,
        IMqttActionListener callback) throws MqttException,MqttPersistenceException {
        final String methodName = "publish";
        // @TRACE 111=< topic={0} message={1}userContext={1} callback={2}
        log.fine(CLASS_NAME, methodName, "111", new Object[]{topic, userContext, callback});

        // Checks if a topic is valid when publishing a message.
        MqttTopic.validate(topic, false/* wildcards NOT allowed */);

        MqttDeliveryToken token = new MqttDeliveryToken(getClientId());
        token.setActionCallback(callback);
        token.setUserContext(userContext);
        token.setMessage(message);
        token.internalTok.setTopics(new String[]{topic});

        MqttPublish pubMsg = new MqttPublish(topic, message);
        comms.sendNoWait(pubMsg, token);

        // @TRACE 112=<
        log.fine(CLASS_NAME, methodName, "112");

        return token;
    }
Copy the code

As you can see from this code, topic and Message are now encapsulated as MqttPublish messages and continue to be executed by comms. SendNoWait, which is ClientComms, ClientComms is initialized in the constructor that initializes MqttAsyncClient, as described in the next section.

/ / ClientComms class:  public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException { final String methodName ="sendNoWait"; // Determine the status or message typeif(isConnected() || (! isConnected() && message instanceof MqttConnect) || (isDisconnecting() && message instanceof MqttDisconnect)) {if(disconnectedMessageBuffer ! = null && disconnectedMessageBuffer.getMessageCount() ! = 0) { //@TRACE 507=Client Connected, Offline Buffer available, but not empty. Adding // message to buffer. message={0} log.fine(CLASS_NAME, methodName,"507", new Object[]{message.getKey()});
                if (disconnectedMessageBuffer.isPersistBuffer()) {
                    this.clientState.persistBufferedMessage(message);
                }
                disconnectedMessageBuffer.putMessage(message, token);
            } else{// Now it's not disconnect, so the logic goes here this.internalSend(message, token); }}else if(disconnectedMessageBuffer ! = null) { //@TRACE 508=Offline Buffer available. Adding message to buffer. message={0} log.fine(CLASS_NAME, methodName,"508", new Object[]{message.getKey()});
            if (disconnectedMessageBuffer.isPersistBuffer()) {
                this.clientState.persistBufferedMessage(message);
            }
            disconnectedMessageBuffer.putMessage(message, token);
        } else {
            //@TRACE 208=failed: not connected
            log.fine(CLASS_NAME, methodName, "208");
            throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
        }
    }
    
    void internalSend(MqttWireMessage message, MqttToken token) throws MqttException {
        final String methodName = "internalSend"; . try { // Persistifneeded and send the message this.clientState.send(message, token); } catch (MqttException e) {// Notice this code ***if(message instanceof MqttPublish) { this.clientState.undo((MqttPublish) message); } throw e; }}Copy the code

The comms.sendNoWait method calls the internalSend method of this class, and the internalSend method calls the clientState. Send (Message, token) method to continue publishing. The ClientState object is initialized in the constructor of ClientComms initialization. Notice the code in catch here, which will be explained below.

/ / ClientState class:  public void send(MqttWireMessage message, MqttToken token) throws MqttException { final String methodName ="send"; .if(message instanceof MqttPublish) {synchronized (queueLock) {** * ActualInFlight actual flight > maxInflight flight * maxInflight: the biggest is that we passed in their code connection option MqttConnectOptions. SetMaxInflight (); The default size is 10 */if (actualInFlight >= this.maxInflight) {
                    //@TRACE 613= sending {0} msgs at max inflight window
                    log.fine(CLASS_NAME, methodName, "613",
                            new Object[]{new Integer(actualInFlight)});

                    throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
                }

                MqttMessage innerMessage = ((MqttPublish) message).getMessage();
                //@TRACE 628=pending publish key={0} qos={1} message={2}
                log.fine(CLASS_NAME, methodName, "628", new Object[]{new Integer(message.getMessageId()), new Integer(innerMessage.getQos()), message}); /** * The Broker does not return an acknowledgement message if the qos level is set to 0. PUBACK * qos==2. The Broker must receive the message, and only once. */ switch (innermessage.getqos ()) {case 2:
                        outboundQoS2.put(new Integer(message.getMessageId()), message);
                        persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
                        break;
                    case 1:
                        outboundQoS1.put(new Integer(message.getMessageId()), message);
                        persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
                        break; } tokenStore.saveToken(token, message); pendingMessages.addElement(message); queueLock.notifyAll(); }}else{... }}Copy the code

An MqttException is thrown if a message is sent that is larger than the maximum value of the maxInflight constraint. If the message is sent that is larger than the maximum value of the maxInflight constraint, an MqttException is thrown. Notice the catch code in the ClientComms internalSend method above:

    if (message instanceof MqttPublish) {
        this.clientState.undo((MqttPublish) message);
    }
Copy the code

It is clear that if the message type is MqttPublish, the clientstate. undo((MqttPublish) message) method is executed. We said earlier that messages have encapsulated topic and message as MqttPublish messages in the PUBLISH method of the MqttAsyncClient class, so undo is executed here:

// ClientState class: protected void undo(MqttPublish Message) throws MqttPersistenceException {final String methodName ="undo";
        synchronized (queueLock) {
            //@TRACE 618=key={0} QoS={1} 
            log.fine(CLASS_NAME, methodName, "618",
                    new Object[]{new Integer(message.getMessageId()),
                            new Integer(message.getMessage().getQos())});

            if (message.getMessage().getQos() == 1) {
                outboundQoS1.remove(new Integer(message.getMessageId()));
            } else {
                outboundQoS2.remove(new Integer(message.getMessageId()));
            }
            pendingMessages.removeElement(message);
            persistence.remove(getSendPersistenceKey(message));
            tokenStore.removeToken(message);
            if(message.getMessage().getQos() > 0) { //Free this message Id so it can be used again releaseMessageId(message.getMessageId()); message.setMessageId(0); } checkQuiesceLock(); }}Copy the code

MaxInflight will not reach maxInflight’s default peak value of 10 for Mqtt messages. If maxInflight reaches its default peak value of 10 for Mqtt messages, you need to manually set a range threshold for your project.

Continuing with the logic in the clientstate.send (message, token) method, comments in the code also indicate that Mqtt determines the message arrival mechanism based on qos levels

Qos level

  • Qos ==0 is sent at most once. Without a retry, the Broker does not return an acknowledgement message and the message may be lost.
  • Qos ==1, sent at least once to ensure that the message reaches the Broker, which needs to return an acknowledgement PUBACK, possibly sending duplicate messages
  • With qos==2, the Broker must receive the message only once

According to the qos level, if qos is equal to 1 and 2, the message is added to outboundQoS1 and outboundQoS2 of the Hashtable type respectively. The subsequent logic ensures that the message is successfully sent and arrived.

Note: The priority of qos levels is not as high as that of maxInflight. As can be seen from the code, maxInflight is judged before qos levels are differentiated

The last part of the code adds messages to the pendingMessages Vector. As we saw in the previous article, MQTT transmitters poll pendingMessages to see if there is any data, and if so, send it through the Socket OutputStream. The data sent back from the Broker is received through the receiver.

Listen for data from messages returned by the Broker

Send we don’t look at the source code, receive we look at the source code, through the source code to see how the data back to our own callback:

// In CommsReceiver: public voidrun() {
        recThread = Thread.currentThread();
        recThread.setName(threadName);
        final String methodName = "run";
        MqttToken token = null;

        try {
            runningSemaphore.acquire();
        } catch (InterruptedException e) {
            running = false;
            return;
        }

        while (running && (in! = null)) { try { //@TRACE 852=networkread message
                log.fine(CLASS_NAME, methodName, "852");
                receiving = in.available() > 0;
                MqttWireMessage message = in.readMqttWireMessage();
                receiving = false; // Whether the message is of Mqtt validation typeif(message instanceof MqttAck) { token = tokenStore.getToken(message); // The token is usually not empty. It has been saved previouslyif(token ! = null) { synchronized (token) { // ... clientState.notifyReceivedAck((MqttAck) message); }}... } finally { receiving =false; runningSemaphore.release(); }}}Copy the code

As can be seen from the code, the Broker returns the data to the clientState. NotifyReceivedAck method:

// ClientState class: protected void notifyReceivedAck(MqttAck ack) throws MqttException {final String methodName ="notifyReceivedAck"; . MqttToken token = tokenStore.getToken(ack); MqttException mex = null;if (token == null) {
            ...
        } else if(ack instanceof MqttPubRec) {// qos==2 is returned MqttPubRel rel = new MqttPubRel((MqttPubRec) ack); this.send(rel, token); }else if(ack instanceof MqttPubAck | | ack instanceof MqttPubComp) {/ / qos = = 1/2 message before removing the result of notifyResult (ack, token, tex-mex); // Do not remove publish / delivery token at this stage //do this when the persistence is removed later 
        } else if(ack instanceof MqttPingResp) {// Connect heartbeat data message... }else if(ack instanceof MqttConnack) {// MQTT connection message... }else {
            notifyResult(ack, token, mex);
            releaseMessageId(ack.getMessageId());
            tokenStore.removeToken(ack);
        }

        checkQuiesceLock();
    }
Copy the code

The notifyResult(ACK, token, mex) method is notifyResult(ACK, token, mex) when qos==0 is sent.

    protected void notifyResult(MqttWireMessage ack, MqttToken token, MqttException ex) {
        final String methodName = "notifyResult"; / / cancel any thread, stop waiting for the token and save the ack token. InternalTok. MarkComplete (ack, ex); / / notify the token has received a response message, set the state has been achieved, and through the isComplete () to obtain state token. InternalTok. NotifyComplete (); // Let the user know that the asynchronous operation is complete, and then remove the tokenif(ack ! = null && ack instanceof MqttAck && ! (ack instanceof MqttPubRec)) { //@TRACE 648=key{0}, msg={1}, excep={2} log.fine(CLASS_NAME, methodName,"648", new Object[]{token.internalTok.getKey(), ack,ex}); / / CommsCallback callback asyncOperationComplete (token); } // In some cases, there is no validation because the operation failedif (ack == null) {
            //@TRACE 649=key={0},excep={1}
            log.fine(CLASS_NAME, methodName, "649", new Object[]{token.internalTok.getKey(), ex}); callback.asyncOperationComplete(token); } // Token class: protected void markComplete(MqttWireMessage MSG, MqttException ex) {final String methodName ="markComplete";
        //@TRACE 404=>key={0} response={1} excep={2}
        log.fine(CLASS_NAME, methodName, "404", new Object[]{getKey(), msg, ex});

        synchronized (responseLock) {
            // ACK means that everything was OK, so mark the message for garbage collection.
            if (msg instanceof MqttAck) {
                this.message = null;
            }
            this.pendingComplete = true; // Store the message in the response member variable and use the getWireMessage() method to get the message MSG this.response = MSG; this.exception = ex; // Token class: protected voidnotifyComplete() {... synchronized (responseLock) { ...if(Exception == null && pendingComplete) {// Setup completed, and the state completed = is obtained by isComplete()true;
                pendingComplete = false;
            } else {
                pendingComplete = false; } responseLock.notifyAll(); }... }Copy the code

The MqttWireMessage has been saved to the token, the asynchronous operation is complete, and the callback listens to the asyncOperationComplete method in CommsCallback:

CommsCallback class: public void asyncOperationComplete(MqttToken token) {Final String methodName ="asyncOperationComplete";

        if (running) {
            // invoke callbacks on callback thread
            completeQueue.addElement(token);
            synchronized (workAvailable) {
                // @TRACE 715=new workAvailable. key={0}
                log.fine(CLASS_NAME, methodName, "715", new Object[]{token.internalTok.getKey()}); workAvailable.notifyAll(); }}else {
            // invoke async callback on invokers thread
            try {
                handleActionComplete(token);
            } catch (Throwable ex) {
                // Users code could throw an Error or Exception e.g. in the case
                // of class NoClassDefFoundError
                // @TRACE 719=callback threw ex:
                log.fine(CLASS_NAME, methodName, "719", null, ex);

                // Shutdown likely already inprogress but no harm to confirm clientComms.shutdownConnection(null, new MqttException(ex)); }}}Copy the code

CommsCallback is the Mqtt connection that has been running, so running is true, so now the token has been added to the completeQueue completion queue. CommsCallback, like the emitter, keeps polling for data. So the completeQueue already has data, and the CommsCallback run will do the following:

CommsCallback class: public voidrun() {...while (running) {
            try {
                ...
                if (running) {
                    // Check fordeliveryComplete callbacks... MqttToken token = null; Synchronized (completeQueue) {// completeQueue is not emptyif(! Completequeue.isempty ()) {// Get the first token token = (MqttToken) CompleteQueue.elementat (0); completeQueue.removeElementAt(0); }}if(null ! HandleActionComplete (token) {// Token is not null, execute handleActionComplete(token); }... }if (quiescing) {
                    clientState.checkQuiesceLock();
                }

            } catch (Throwable ex) {
                ...
            } finally {
                ...
            }
        }
    }
    
    private void handleActionComplete(MqttToken token)
            throws MqttException {
        final String methodName = "handleActionComplete"; Synchronized (token) {// isComplete() is set to synchronized (token)true
            if (token.isComplete()) {
                // Finish by doing any post processing such as delete 
                // from persistent store but only do so ifthe action // is complete clientState.notifyComplete(token); } / / cancel any waiter, if pending, now, set up complete token internalTok. NotifyComplete ();if(! token.internalTok.isNotified()) { ... // Now call the asynchronous operation to complete the callback fireActionEvent(token); }... }}Copy the code

Call the handleActionComplete function in the run, and then call the clientState. After notifyComplete () method and fireActionEvent (token) method, look at the notifyComplete () :

// ClientState class: protected void notifyComplete(MqttToken token) throws MqttException {final String methodName ="notifyComplete"; / / get saved to the Token of the Broker return messages, there are instructions above MqttWireMessage message = Token. InternalTok. GetWireMessage ();if(message ! = null && message instanceof MqttAck) { ... MqttAck ack = (MqttAck) message;ifRemove (getSendPersistenceKey(message)); persistence.remove(getSendBufferedPersistenceKey(message)); outboundQoS1.remove(new Integer(ack.getMessageId())); decrementInFlight(); releaseMessageId(message.getMessageId()); tokenStore.removeToken(message); // @TRACE 650=removed Qos 1 publish. key={0} log.fine(CLASS_NAME, methodName,"650",
                       new Object[]{new Integer(ack.getMessageId())});
           } else if(ack instanceof MqttPubComp) { ... } checkQuiesceLock(); }}Copy the code

Look again at the fireActionEvent(token) method:

CommsCallback class: public void fireActionEvent(MqttToken token) {final String methodName ="fireActionEvent";

       if(token ! = null) { IMqttActionListener asyncCB = token.getActionCallback();if(asyncCB ! = null) {if (token.getException() == null) {
                   ...
                   asyncCB.onSuccess(token);
               } else{... asyncCB.onFailure(token, token.getException()); }}}}Copy the code

AsyncCB: asyncCB: asyncCB: asyncCB: asyncCB: asyncCB: asyncCB: asyncCB: asyncCB: asyncCB: asyncCB

MqttToken class: public IMqttActionListenergetActionCallback() {
       returninternalTok.getActionCallback(); } // Token class: public IMqttActionListenergetActionCallback() {
   	return callback;
   }
Copy the code

We can look at the callback setting method directly and see where it came from:

// Token class: public voidsetActionCallback(IMqttActionListener listener) { this.callback = listener; } // MqttToken class: public voidsetActionCallback(IMqttActionListener listener) { internalTok.setActionCallback(listener); } // ConnectActionListener class: Public void Connect () throws MqttPersistenceException {// Initializes MqttToken MqttToken token = new MqttToken(client.getClientId()); // Set this class to the callback class token.setactionCallback (this); token.setUserContext(this); . }Copy the code

AsyncCB is a ConnectActionListener, so it is already in the onSuccess and onFailure methods of the ConnectActionListener class. Let’s take a look at just one onSuccess:

// ConnectActionListener class: public void onSuccess(IMqttToken token) {if(originalMqttVersion == MqttConnectOptions.MQTT_VERSION_DEFAULT) { options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_DEFAULT); } / / at this point to save the Broker data into the userToken userToken. InternalTok. MarkComplete (token. The method getResponse (), null); userToken.internalTok.notifyComplete(); userToken.internalTok.setClient(this.client); comms.notifyConnect();if(userCallback ! = null) { userToken.setUserContext(userContext); userCallback.onSuccess(userToken); }if (mqttCallbackExtended != null) {
           String serverURI =
                   comms.getNetworkModules()[comms.getNetworkModuleIndex()].getServerURI();
           mqttCallbackExtended.connectComplete(reconnect, serverURI);
       }

   }
Copy the code

Who is the userCallback here? As mentioned in the previous article, userCallback is the IMqttActionListener listener in mqTTConnection. connect. The listener callback in the CONNECT method of the MqttConnection class is returned:

/ / MqttConnection class:  public void connect(MqttConnectOptions options, String invocationContext, String activityToken) { ... service.traceDebug(TAG,"Connecting {" + serverURI + "} as {" + clientId + "}"); final Bundle resultBundle = new Bundle(); resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN, activityToken); resultBundle.putString( MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT, invocationContext); resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.CONNECT_ACTION); try { ... IMqttActionListener Listener = new MqttConnectionListener(resultBundle) {@override public void IMqttActionListener = new MqttConnectionListener(resultBundle) OnSuccess (IMqttToken asyncActionToken) {doAfterConnectSuccess(resultBundle);
                   service.traceDebug(TAG, "connect success!");
               }

               @Override
               public void onFailure(IMqttToken asyncActionToken,
                                     Throwable exception) {
                   resultBundle.putString(
                           MqttServiceConstants.CALLBACK_ERROR_MESSAGE,
                           exception.getLocalizedMessage());
                   resultBundle.putSerializable(
                           MqttServiceConstants.CALLBACK_EXCEPTION, exception);
                   service.traceError(TAG,
                           "connect fail, call connect to reconnect.reason:"
                                   + exception.getMessage());

                   doAfterConnectFail(resultBundle); }};if(myClient ! = null) {if (isConnecting) {
                   ...
               } else {
                   service.traceDebug(TAG, "myClient ! = null and the client is not connected");
                   service.traceDebug(TAG, "Do Real connect!");
                   setConnectingState(true); myClient.connect(connectOptions, invocationContext, listener); / /}}if myClient is null, then create a new connection
           else{... myClient.connect(connectOptions, invocationContext, listener); } } catch (Exception e) { ... }}Copy the code

As you can see from this code and comments, the doAfterConnectSuccess method in the MqttConnection class is now executed:

MqttConnection class: private voiddoAfterConnectSuccess(final Bundle resultBundle) {// Obtain the wakeup lock acquireWakeLock(); service.callbackToActivity(clientHandle, Status.OK, resultBundle); deliverBacklog();setConnectingState(false);
       disconnected = false; // Release wakeup lock releaseWakeLock(); } private voiddeliverBacklog() {
       Iterator<StoredMessage> backlog = service.messageStore
               .getAllArrivedMessages(clientHandle);
       while(backlog.hasNext()) { StoredMessage msgArrived = backlog.next(); Bundle resultBundle = messageToBundle(msgArrived.getMessageId(), msgArrived.getTopic(), msgArrived.getMessage()); // Pay attention to this action, Can use the following resultBundle. PutString (MqttServiceConstants CALLBACK_ACTION, MqttServiceConstants. MESSAGE_ARRIVED_ACTION); service.callbackToActivity(clientHandle, Status.OK, resultBundle); }}Copy the code

Can see this function call several method of two of the service. The callbackToActivity (clientHandle Status. OK, resultBundle); And deliverBacklog (); , deliverBacklog () method is the last and invoke the service. The callbackToActivity method. So direct service. CallbackToActivity:

/ / MqttService class:  void callbackToActivity(String clientHandle, Status status, Bundle dataBundle) {/ / send broadcast Intent callbackIntent = new Intent (MqttServiceConstants. CALLBACK_TO_ACTIVITY);if(clientHandle ! = null) { callbackIntent.putExtra( MqttServiceConstants.CALLBACK_CLIENT_HANDLE, clientHandle); } callbackIntent.putExtra(MqttServiceConstants.CALLBACK_STATUS, status);if(dataBundle ! = null) { callbackIntent.putExtras(dataBundle); } LocalBroadcastManager.getInstance(this).sendBroadcast(callbackIntent); }Copy the code

Send broadcast service. CallbackToActivity method is, and who to receive the radio? BroadcastReceiver MqttAndroidClient is a BroadcastReceiver. The BroadcastReceiver is a BroadcastReceiver. So let’s look at its onReceive method:

/ / MqttAndroidClient class:  @Override public void onReceive(Context context, Intent intent) { Bundle data = intent.getExtras(); String handleFromIntent = data .getString(MqttServiceConstants.CALLBACK_CLIENT_HANDLE);if((handleFromIntent == null) || (! handleFromIntent.equals(clientHandle))) {return; } String action = data.getString(MqttServiceConstants.CALLBACK_ACTION); // Determine the action type of the messageif (MqttServiceConstants.CONNECT_ACTION.equals(action)) {
           connectAction(data);
       } else if (MqttServiceConstants.CONNECT_EXTENDED_ACTION.equals(action)) {
           connectExtendedAction(data);
       } else if (MqttServiceConstants.MESSAGE_ARRIVED_ACTION.equals(action)) {
           messageArrivedAction(data);
       } else if (MqttServiceConstants.SUBSCRIBE_ACTION.equals(action)) {
           subscribeAction(data);
       } else if (MqttServiceConstants.UNSUBSCRIBE_ACTION.equals(action)) {
           unSubscribeAction(data);
       } else if(MqttServiceConstants. SEND_ACTION. Equals (action)) {/ / release the success callback sendAction (data); }else if (MqttServiceConstants.MESSAGE_DELIVERED_ACTION.equals(action)) {
           messageDeliveredAction(data);
       } else if (MqttServiceConstants.ON_CONNECTION_LOST_ACTION
               .equals(action)) {
           connectionLostAction(data);
       } else if (MqttServiceConstants.DISCONNECT_ACTION.equals(action)) {
           disconnected(data);
       } else if (MqttServiceConstants.TRACE_ACTION.equals(action)) {
           traceAction(data);
       } else {
           mqttService.traceError(MqttService.TAG, "Callback action doesn't exist."); }}Copy the code

As you can see from the code and comments and from the deliverBacklog method above, the action we now need to focus on is MESSAGE_ARRIVED_ACTION, so call the messageArrivedAction(data) method:

// MqttAndroidClient class: private void messageArrivedAction(Bundle data) {if(callback ! = null) { String messageId = data .getString(MqttServiceConstants.CALLBACK_MESSAGE_ID); String destinationName = data .getString(MqttServiceConstants.CALLBACK_DESTINATION_NAME); ParcelableMqttMessage message = data .getParcelable(MqttServiceConstants.CALLBACK_MESSAGE_PARCEL); try {if (messageAck == Ack.AUTO_ACK) {
                   callback.messageArrived(destinationName, message);
                   mqttService.acknowledgeMessageArrival(clientHandle, messageId);
               } else{ message.messageId = messageId; callback.messageArrived(destinationName, message); } / /let the service discard the saved message details
           } catch (Exception e) {
               // Swallow the exception
           }
       }
   }
   
   @Override
   public void setCallback(MqttCallback callback) {
       this.callback = callback;
   }
Copy the code

As you can see from the messageArrivedAction method, we finally call the callback to the messageArrived method, so the callback can be seen from the code above and below, This callback is our own callback class that implements the MqttCallback interface after we initialize MqttAndroidClient using the setCallback method as described in the previous article.

Listen for the success of publishing messages returned by the Broker

Look again at the sendAction(data) method:

   private void sendAction(Bundle data) {
       IMqttToken token = getMqttToken(data); 
       // remove on delivery
       simpleAction(token, data);
   }
   
   private void simpleAction(IMqttToken token, Bundle data) {
       if(token ! = null) { Status status = (Status) data .getSerializable(MqttServiceConstants.CALLBACK_STATUS);if(status == status.ok) {// Call back to this method if published successfully ((MqttTokenAndroid) token).notifyComplete(); }else{ Exception exceptionThrown = (Exception) data.getSerializable(MqttServiceConstants.CALLBACK_EXCEPTION); // Publish failure callback ((MqttTokenAndroid) token). NotifyFailure (exceptionThrown); }}else {
           if(mqttService ! = null) { mqttService.traceError(MqttService.TAG,"simpleAction : token is null"); }}}Copy the code

Next, take a look at the notifyComplete function of MqttTokenAndroid that publishes a successful callback:

// MqttTokenAndroid class: voidnotifyComplete() {
       synchronized (waitObject) {
           isComplete = true;
           waitObject.notifyAll();
           if(listener ! = null) { listener.onSuccess(this); }}}Copy the code

Call listener.onsuccess (this). Who is the listener? The listener is the last parameter of the publish publish of MqttAndroidClient class, which is the callback class that listens for the success of the publish message. Above in the publish method MqttConnection classes encapsulate an MqttServiceConstants SEND_ACTION bundles of data, This data is carried by MqttConnectionListener in the MqttConnection class. So the MqttConnectionListener onSuccess is called is called service. CallbackToActivity, then to send radio sendBroadcast, last call sendAction method, Callback custom IMqttActionListener implementation class. The onSuccess in MqttConnectionListener is in the fireActionEvent method of the CommsCallback class, You go up to the handleActionComplete and Run () functions of the CommsCallback class.

There are two listeners that listen for messages returned by the Broker, one for data sent from the Broker to the client, and the other for successful messages posted by the client. Both use MqttActionListener, but the former only calls the custom MqttCallback callback in the MqttActionListener listener callback. And the two listening position is different, the former is confirmed in the MqttConnection class connect, for an MQTT connection will only have one, so this is always used to listen data; The latter listens for the success of the publish message, which is passed in by every publish and initialized by publish in the MqttConnection class. That makes it a little bit clearer.

MQTT publish and receive Broker data source analysis.

(Note: Please correct me if there is anything wrong.)