IoTDB is the first open source timing database project initiated by Tsinghua University, and is now the top project of Apache. IoTDB can provide users with data collection, storage and analysis services. Due to its lightweight architecture, high performance and high availability features, and seamless integration with Hadoop and Spark ecosystems, it meets the needs of massive data storage, high-throughput data writes, and complex data query analysis in the industrial IoT space.

EMQ X is a large-scale, scalable, open source cloud native distributed Internet of Things messaging middleware, released by the open source Internet of Things data infrastructure software provider EMQ Cloud technology. EMQ X can efficiently and reliably handle concurrent connections of massive iot devices and has a powerful rules engine built in for high-performance real-time processing of event and message flow data. Rules engine provides a flexible configuration service integration solution through SQL statements, simplifying the business development process, improving the ease of use, and reducing the coupling degree between users’ business logic and EMQ X.

This article describes how to use the MQTT data bridge feature of the EMQ X rules engine to receive data sent by the MQTT client and insert it into the timing database IoTDB in real time.

The preparatory work

The software and environment used in the examples in this article:

  • Operating system: Mac OSX
  • IoTDB: Binary package (Server), version 0.12.4
  • MQTT server: EMQ X Open Source v4.3.11
  • MQTT client software:MQTTX v1.6.0

IoTDB installation

First we need to download the binary package of IoTDB Server (standalone version) from the official page of IoTDB.

After downloading, decompress and enter the decompressed directory:

% ls
LICENSE         README.md       RELEASE_NOTES.md data             ext             licenses         sbin
NOTICE           README_ZH.md     conf             docs             lib             logs             tools
Copy the code

To enable IoTDB MQTT protocol support, you need to modify the IoTDB configuration file conf/ iOTdb-engine.properties:

* Subsequent modeling used a storage group root.sg. To increase write parallelism, set virtual_storage_group_num in iOTdb-engine. properties to the number of machine cores.

#################### ### MQTT Broker Configuration #################### # whether to enable the mqtt service. Enable_mqtt_service =true # the MQTT service binding host. Mqtt_host =0.0.0.0 # The MQTT service binding port. mqtt_port=2883 # the handler pool size for handing the mqtt messages. mqtt_handler_pool_size=1 # the mqtt message payload formatter. mqtt_payload_formatter=json # max length of mqtt message in byte mqtt_max_message_size=1048576Copy the code

Enable_mqtt_service is false by default and needs to be changed to true. Mqtt_port defaults to 1883, but needs to be changed to 2883 to avoid conflicts with the emqX port number.

Run the./sbin/start-server.sh command to start the IoTDB server:

% ./sbin/start-server.sh--------------------- Starting IoTDB --------------------- Maximum memory allocation pool = 2048MB, initial memory allocation pool = 512MB If you want to change this configuration, please check conf/iotdb-env.sh(Unix or OS X, if you use Windows, Check the conf/iotdb - env. Bat). The 2022-01-10 14:15:31, 914 [main] INFO O.A.I.D.C.I oTDBDescriptor: 121 - Start to read the config file  file:./sbin/.. /conf/iotdb-engine.properties ... The 2022-01-10 14:14:28, 690 [main] INFO O.A.I.D.S.U pgradeSevice: 73 - Upgrade service stopped the 2022-01-10 14:14:28, 690 [main] INFO o.a.i.db.service.IoTDB:153 - Congratulation, IoTDB is set up successfully. Now, enjoy yourself! The 2022-01-10 14:14:28, 690 [main] INFO O.A.I.D b.s ervice. IoTDB: 101 - IoTDB has startedCopy the code

Let’s leave the terminal window unchanged and open a new command-line terminal window to launch the shell tool for IoTDB:

% ./sbin/start-cli.sh--------------------- Starting IoTDB Cli --------------------- _____ _________ ______ ______ |_ _| | _ _ ||_ _ `.|_ _ \ . | | -. | _ / | | \ _ | | | `. \ | | _) | | | /. '` \ \ | | | | | | | __'. _ | | _ | \ __ _ - | | | _ _ | | _. '/ _ | | | __) | | | _____ '. __. '_____ | |......' |... / version 0.12.4

IoTDB> login successfully
IoTDB>
Copy the code

The IoTDB environment is now ready. For more information about how to use IoTDB, please refer to the get Started page on the official website.

Install and configure EMQ X

Download and start EMQ X

You can download the EMQ X open Source edition of macOS directly from the command line. For more installation packages, please visit the EMQ X Open Source Edition download page.

%Wget HTTP: / / https://www.emqx.com/en/downloads/broker/4.3.11/emqx-macos-4.3.11-amd64.zip
Copy the code

Then unzip and start EMQ X:

%Unzip - q emqx - macos - 4.3.11 - amd64. Zip
% cd emqx
% ./bin/emqx consoleLog. to = "console" Erlang/OTP 23 [ERTS-11.1.8] [EMqx] [64-bit] [SMP :8:8] [DS :8:8:8] [Async-Threads :4] [hiPE] Starting Emqx on node [email protected] Start MQTT: TCP :internal listener on 127.0.0.1:11883 successfully. Start MQTT: TCP :external Listener on 0.0.0.0:1883 successfully. Start MQTT :ws: External listener on 0.0.0.0:8083 successfully. Start MQTT: SSL :external listener on 0.0.0.0:8883 successfully. Start MQTT: WSS: External listener on 0.0.0.0:8084 successfully. Start http:management listener on 8081 successfully. Start http:dashboard listener on 18083 successfully. EMQ X Broker 4.3.11 is running now! Eshell V11.1.8 (Abort with ^G) ([email protected])1>Copy the code

Configuration rules

Open the EMQ X Dashboard in a browser and create a rule on the rules Engine page:

The SQL statement is:

SELECT
    clientid,
    now_timestamp('millisecond') as now_ts_ms,
    payload.bar as bar
FROM
    "t/#"
Copy the code

We then add a “bridge data to MQTT Broker” action to the rule at the bottom of the page:

To create an MQTT Bridge resource, click “New Resource” in the upper right corner:

The remote Broker address is the MQTT service address of IoTDB, i.e. “127.0.0.1:2883”. Set the client Id, user name, and password to root, because root is the default user name and password of IoTDB.

For other options, keep the default values. Click the “Test Connection” button to make sure the configuration is correct, and then click the “New” button in the lower right corner to create the resource.

Now back to the Action creation page, the associated resource drop-down box is automatically filled with the resource we just created.

Now we continue to fill in more action parameters:

IoTDB doesn’t care about message topics, we fill in an arbitrary topic: foo.

IoTDB requires the message content to be a JSON format, and the message content template can be filled in as shown in the figure above. For details, see the communication Service Protocol documentation of IoTDB.

{
 "device": "root.sg.${clientid}"."timestamp": ${now_ts_ms},
 "measurements": [
   "bar"]."values": [
   ${bar}
 ]
}
Copy the code

Note that “clientid”,”{clientid}”, “clientid”,”{now_ts_ms}” and “${bar}” are all variables extracted from the output of regular SQL statements, Therefore, you must ensure that these variables correspond to the SELECT statement of the SQL statement.

Now click Ok to save the action configuration and click New again to complete the rule creation.

Use MQTT Client to send messages

Next we use MQTT client tool – MQTT X to send a message to EMQ X:

MQTT X is a fully open source MQTT 5.0 cross-platform desktop client released by EMQ. Supports the rapid creation of multiple simultaneous MQTT client connections to facilitate testing of MQTT/TCP, MQTT/TLS, MQTT/WebSocket connections, publish, subscribe functions and other MQTT protocol features.

In the connection parameters of MQTT Client, we only need to fill in one parameter, Client ID: “ABC”, and keep the default values of other parameters unchanged.

After the connection is successful, we send two messages with the subject “T /1” in the format of the message content:

{
 "bar": 0.2
}
Copy the code

Then go back to the rule engine page of the EMQ X Dashboard and observe the number of rule hits and confirm that the rule is triggered twice:

Finally, we return to the IoTDB client window of the command line terminal and use the following SQL statement to query the data:

IoTDB> SHOW TIMESERIES root.sg.abc
+---------------+-----+-------------+--------+--------+-----------+----+----------+
|     timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
+---------------+-----+-------------+--------+--------+-----------+----+----------+
|root.sg.abc.bar| null|     root.sg|   FLOAT| GORILLA|     SNAPPY|null|      null|
+---------------+-----+-------------+--------+--------+-----------+----+----------+
Total line number = 1
It costs 0.006s

IoTDB> SELECT * FROM root.sg.abc
+-----------------------------+---------------+
|                         Time|root.sg.abc.bar|
+-----------------------------+---------------+
|2022- 01- 10T17:39:41.724+08:00|            0.3|
|2022- 01- 10T17:40:32.805+08:00|            0.2|
+-----------------------------+---------------+
Total line number = 2
It costs 0.007s
IoTDB>
Copy the code

Data inserted successfully!

conclusion

At this point, we are done persisting messages to the IoTDB sequential database through the EMQ X rules engine feature.

In actual production scenarios, we can use EMQ X to process massive concurrent connections of iot devices, flexibly process business functions through rules engine, and persist the messages sent by devices to IoTDB database. Finally, IoTDB is connected with Hadoop/Spark, Flink or Grafana to achieve big data analysis and visual display.

The combination of EMQ X + IoTDB is a simple, efficient, scalable and highly available server-side integration solution, which is a good choice for iot device management and data processing scenarios.

Copyright: EMQ

Original link: www.emqx.com/zh/blog/sto…