What is a Paho?

Paho is a subproject of the Eclipse IoT Open Source project. The Paho project provides the implementation of an open source reliable open standard messaging protocol with the goal of providing new, existing and emerging applications for machine-to-machine and the Internet of Things.

Paho is an open source MQTT client SDK. To be precise, it is a set of languages, including various implementation versions of C, Java, Python, javascript, Golang, etc. Among them, Paho.mqtt. Java is the implementation version of Java language, which is the protagonist of this article.

How do I use Paho?

Paho’s API design is very clear and concise. Publishers and subscribers to MQTT are both clients, so first create a client, then connect to a proxy server, and then subscribe or publish messages.

Import the paho package, using the Maven project as an example, and add the following dependencies to the POM.xml file

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>
Copy the code

Creating a Client

// Proxy server address (server)
String broker = "TCP: / / 127.0.0.1:1883";
String clientId = "mqtt-client-1";
MemoryPersistence persistence = new MemoryPersistence();
MqttClient client = new MqttClient(broker, clientId, persistence);
client.setCallback(new MqttCallback() {
    public void connectionLost(Throwable throwable) {}public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        System.out.println("receive message:" + s + "" + new String(mqttMessage.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {}});Copy the code

Connecting to the Proxy Server

MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("admin");
options.setPassword("123456".toCharArray());
client.connect(options);
Copy the code

news

String content = "hello mqtt";
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(content.getBytes());
client.publish("jack", mqttMessage);
Copy the code

Subscribe to news

String topic = "testtopic";
client.subscribe(topic);
Copy the code

When there is a ‘testtopic’ theme after the message arrives at a proxy server, proxy server will forward the message to the client, the final performance for the callback MqttCallback. MessageArrived (String topic, MqttMessage message) method.

Paho design principles

The communication realized through socket will involve reading data from InputStream and writing data to OutputStream. The realization of such two-way communication requires the use of multi-threading technology, and also needs to consider the rational utilization of CPU resources, producers and consumers, etc.

Paho uses four threads for this communication, two for reading and two for writing. Read and write use a message queue to store the sent and received messages for message caching and forwarding, and use Java Object’s wait()/notify() to block/wake up threads to solve the producer-consumer problem and CPU resource waste problem.

Protocol implementation are mainly concentrated in the org. Eclipse. Paho. Client. Mqttv3. Internal. The wire package, protocol defines a total of 15 types of messages, using object-oriented programming, after encapsulation, abstraction MqttWireMessage is the base class message.

1=Invalid protocol version
2=Invalid client ID
3=Broker unavailable
4=Bad user name or password
5=Not authorized to connect
Copy the code

Paho prints information in multiple languages. Multilingual Message is implemented using Java’s ResourceBundle.

API. Libraries provide interfaces for external use.

Paho from a source code perspective

Source code interpretation, focusing on the network communication module, that is, to expand mqttClient.connect ().

org\eclipse\paho\client\mqttv3\MqttClient.java

public void connect(MqttConnectOptions options) throws MqttSecurityException, MqttException {
    aClient.connect(options, null.null).waitForCompletion(getTimeToWait());
}
Copy the code

The classes are then called all the way to the ClientComms class, which is a utility class that handles communication between the client and the server.

org\eclipse\paho\client\mqttv3\internal\ClientComms.java

public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
    final String methodName = "connect";
    synchronized (conLock) {
        if(isDisconnected() && ! closePending) { ... . ConnectBG conbg =new ConnectBG(this, token, connect, executorService);
            // Start the ConnectBG thread, that is, execute the connectbg.run () methodconbg.start(); }... . }}private class ConnectBG implements Runnable {
    public void run(a) {
        try{... .// The key flow of network communication is here
            NetworkModule networkModule = networkModules[networkModuleIndex];
            // 1 Create socket and connect to server
            networkModule.start();
            receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
            // 2 Starts the message receiving thread to read data sent by the server from the socket inputStream
            receiver.start("MQTT Rec: "+getClient().getClientId(), executorService);
            sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
            // 3 Starts the message sending thread, which continuously reads user publis messages from the vector and writes them to the socket outputStream
            sender.start("MQTT Snd: "+getClient().getClientId(), executorService);
            // 4 Starts the callback thread, reads the data written by the message receiving thread from the vector, and calls back the user code
            callback.start("MQTT Call: "+getClient().getClientId(), executorService); internalSend(conPacket, conToken); }... . }}Copy the code

summary

Paho is a small open source project with only 96 Java files that aims to implement the MqTT protocol and encapsulate the interface for use by users. The project is very small, so the threshold is relatively low, can understand. Although the function is simple, but many common problems will be dealt with, such as the classic producer consumer problems, synchronous asynchronous problems, multi-threading problems, reading the source code will have some inspiration. In addition, the project is produced by Eclipse Factory, so I can learn the code design of The factory.