background
Taking a common Internet of Things usage scenario as an example, this paper introduces how to use edge computing to achieve fast, low-cost and efficient processing of business.
In various Internet of Things projects, such as intelligent building projects, building data (such as elevators, gas, water and electricity, etc.) need to be collected and analyzed. One solution is to connect all devices directly to an IoT platform in the cloud, something like Azure IoT Hub or AWS IoT Hub. The problem with this solution is that
- Long delay in data processing: It takes a long time for data to be transmitted over the Internet or processed in the cloud to be returned to the device
- Data transfer and storage costs: Transmission over the Internet requires bandwidth, which can be considerable for large-scale connected iot projects
- Security of data: Some iot data can be very sensitive, and it is risky to transfer all iot data
In order to solve the above problems, the industry has put forward the solution of edge computing. The core of edge computing is to process the data nearby to avoid unnecessary delay, cost and security problems.
The business scenario
Suppose you have a set of devices, each of which has an ID, that send data over the MQTT protocol to the corresponding topic on the MQTT messaging server. The theme design is as follows, where {device_id} is the device ID.
devices/{device_id}/messages
Copy the code
Each device sends the temperature and humidity data collected by the sensor in JSON format.
{
"temperature": 30."humidity" : 20
}
Copy the code
Now you need to analyze the data in real time and come up with the following requirements: Average value (T_av) was calculated for the temperature data of each device every 10 seconds, and the maximum value (t_max), minimum value (t_min) and number of data items (t_count) were recorded within 10 seconds. After the calculation, the four results were saved, as shown in the following example:
[{"device_id" : "1"."t_av" : 25."t_max" : 45."t_min" : 5."t_count" : 2
},
{
"device_id" : "2"."t_av" : 25."t_max" : 45."t_min" : 5."t_count" : 2},... ]Copy the code
Plan to introduce
As shown in the figure below, we adopt the edge analysis/streaming data processing method, at the edge end we adopt EMQ X solution, and finally output the calculation results to Azure IoT Hub.
- EMQ X Edge can be connected to devices of various protocol types, such as MQTT, CoAP, LwM2M, etc., so that users do not need to worry about protocol adaptation. It is also lightweight enough to be deployed on edge devices.
- EMQ X Kuiper is a lightweight SQL-based edge streaming data analysis engine published by EMQ. The installation package is only about 7MB, which is ideal for running on edge devices
- The Azure IoT Hub provides a comprehensive solution for device access and data analysis, which is used for cloud result data access and application result data analysis
Implementation steps
Install EMQ X Edge & Kuiper
-
As of this writing, the latest version of EMQ X Edge is 4.0, and users can install and start EMQ X Edge from Docker
# docker pull emqx/emqx-edge # docker run -d --name emqx -p 1883:1883 emqx/emqx-edge:latest # docker psCONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES a348e3ac150c emqx/emqx-edge:latest "/usr/bin/docker-entr" 3 seconds ago Up 2 seconds 4369/tcp, 5369/tcp, 6369/tcp, 8080/tcp, 8083-8084/tcp, 8883/tcp, 11883/tcp, 0.0.0.0:1883 - > 1883 / TCP, 18083 / TCP emqxCopy the code
You can run the Telnet command to check whether the startup is successful, as shown in the following figure.
# telnet localhost 1883 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. Copy the code
-
Install and start Kuiper
Click here to download the latest version of Kuiper and unzip it. At the time of this writing, the latest version of Kuiper is 0.0.3.
#Unzip kuiper - Linux - amd64-0.0.3.zip # cd kuiper # bin/server Serving Kuiper server on port 20498 Copy the code
If it cannot be started, check the log file log/stream.log.
Create a flow
Kuiper provides a command to manage flows and rules. You can see the subcommands and their help by typing bin/cli in a command line window. By default, cli commands connect to the local Kuiper server. Cli commands can also connect to other Kuiper servers. Users can modify the connected Kuiper server in the etc/client.yaml configuration file. For more information about the command line, see here.
Create a flow definition: The purpose of creating a flow is to define the format of the data to be sent on the flow, similar to defining the structure of a table in a relational database. All supported data types in Kuiper can be found here.
# cd kuiper
# bin/cli create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="devices/+/messages")'
Copy the code
Kuiper creates a stream definition called Demo in Kuiper, which contains two fields, temperature and humidity, and data sources for the topic devices/+/messages subscribed to MQTT. Note that the wildcard + is used here. Messages for subscribing to different devices. The MQTT server address corresponding to the data source can be configured according to the server address in the etc/mqtt_source.yaml configuration file. Configure the Servers project as shown in the figure below.
#Global MQTT configurations
default:
qos: 1
sharedsubscription: true
servers: [TCP: / / 127.0.0.1:1883]
Copy the code
Users can type the Describe subcommand from the command line to view the newly created flow definition.
# bin/cli describe stream demoConnecting to 127.0.0.1:20498 Fields -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- temperature float humidity bigint FORMAT: JSON DATASOURCE: devices/+/messagesCopy the code
Data business logic processing
Kuiper uses SQL to implement the business logic, and calculates the average, maximum, minimum and frequency of the temperature every 10 seconds, and groups the temperature according to the device ID. The SQL implemented is as follows.
SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/".1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)
Copy the code
The SQL here uses four aggregate functions to count the correlated values within the 10-second window.
avg
Average:max
The maximum value of:min
: the minimumcount
Counting:
Two basic functions are also used
mqtt
: fetch MQTT protocol information,mqtt(topic)
Gets the subject name of the current get messagesplit_value
: Splits the first argument with the second argument, and then the third argument specifies the subscript to get the split value. So the functionsplit_value("devices/001/messages", "/", 1)
The call returns001
GROUP BY = device_id; GROUP BY = device_id; Time window TUMBLINGWINDOW(SS, 10), which means that a batch of statistics is generated every 10 seconds.
Debugging SQL
Before formally writing rules, we need to debug the rules, Kuiper provides SQL debugging tools, you can make it very convenient for users to debug SQL.
-
Go to the kuiper installation directory and run bin/ CLI Query
-
Enter the prepared SQL statement at the command line prompt that appears.
# bin/cli queryConnecting to 127.0.0.1:20498 kuiper > SELECT AVg (temperature) AS t_av, Max (temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10) query is submit successfully. kuiper >Copy the code
In the log file log/stream.log, you can see that a temporary rule named internal-kuiper_query_rule was created.
. time="2019-11-12T11:56:10+08:00" level=info msg="The connection to server TCP ://10.211.55.6:1883 was established successfully" rule=internal-kuiper_query_rule time="2019-11-12T11:56:10+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=internal-kuiper_query_rule Copy the code
Note that this rule, called internal-kuiper_query_rule, is created with query. The server checks the Query client every 5 seconds to see if it is online. If the Query client does not respond for more than 10 seconds (such as being shut down), The internal-kuiper_query_rule is automatically deleted and the following information is printed in the log file when it is deleted.
. time="2019-11-12T12:04:08+08:00" level=info msg="The client seems no longer fetch the query result, stop the query now." time="2019-11-12T12:04:08+08:00" level=info msg="stop the query." time="2019-11-12T12:04:08+08:00" level=info msg="unary operator project cancelling...." rule=internal-kuiper_query_rule ... Copy the code
-
Sending test data
Send the following test data to EMQ X Edge using any test tool. The author used JMeter MQTT plug-in in the test process, because JMeter can do some flexible automatic data generation, business logic control, as well as a large number of equipment simulation, etc.. Users can also simply emulate mosquitos with mosquitos and other clients.
- Topic:
devices/$device_id/messages
, including$device_id
Is the first column in the following data - Message:
{"temperature": $temperature, "humidity" : $humidity}
, including$temperature
和$humidity
Are the second and third columns in the following data
#device_id, temperature, humidity1,20,30,31,40,35,50,20,30,80,90,45,20,10,90,12,30,65,35,55,32Copy the code
We can see that after sending the mock data, two sets of data are printed in two 10-second time Windows on the Query client command line. The number of output results is dependent on how often the user sends data. If Kuiper receives all data within a time window, it prints only one result.
kuiper > [{" device_id ":" 1 ", "t_av" : 45, "t_count" : 3, "t_max:" 80, "t_min" : 20}, {" device_id ":" 2 ", "t_av" : 25.5, "t_count" : 2, "t_max" : 31, "t_mi n":20}] [{" device_id ", "2", "t_av" : 37.333333333333336, "t_count" : 3, "t_max" : 55, "t_min" : 12}, {" device_id ":" 1 ", "t_av" : 37.5, "t_count" : 2, "t_max":65,"t_min":10}]Copy the code
- Topic:
Create and submit rules
After debugging the SQL, configure the rules file and send the resulting data to the remote Azure IoT Hub via Kuiper’s MQTT Sink. In Azure IoT Hub, users need to create the following
- IoT Hub: The name created in this article is
rockydemo
Is used to access devices - IoT Device: represents a Device, which is the gateway for processing Device data. Kuiper is installed on the gateway. After processing relevant data, the gateway sends the results to Azure cloud
- Device connection user name and password: Refer to the Azure related documentation for the User name and password for the Azure IoT MQTT connection. For details about generating SAS tokens, see this document.
The device is created in the Azure IoT Hub as shown below.
Write Kuiper rules files
The rule file is a text file that describes the logic of business processing (previously debugged SQL statements) and the configuration of Sink (destination of message processing results). Most of the information for connecting to the Azure IoT Hub is described in the previous section. Note that the protocol_version must be set to 3.1.1 instead of 3.1.
{
"sql": "SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), \"/\", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)"."actions": [{"log": {}}, {"mqtt": {
"server": "ssl://rockydemo.azure-devices.net:8883"."topic": "devices/demo_001/messages/events/"."protocol_version": 3.1.1 ""."qos": 1."clientId": "demo_001"."username": "rockydemo.azure-devices.net/demo_001/?api-version=2018-06-30"."password": "SharedAccessSignature sr=*******************"}}}]Copy the code
Create rules from the Kuiper command line
# bin/cli create rule rule1 -f rule1.txt
Connecting to 127.0.0.1:20498
Creating a new rule from file rule1.txt.
Rule rule1 was created.
Copy the code
In the log file, you can view the running connections of the rules, and if the configuration items are correct, you should see that the connection to the Azure IoT Hub was successfully established.
. time="2019-11-12T14:30:34+08:00" level=info msg="The connection to server TCP ://10.211.55.6:1883 was established successfully" rule=rule1
time="2019-11-12T14:30:34+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=rule1
time="2019-11-12T14:30:35+08:00" level=info msg="The connection to server ssl://rockydemo.azure-devices.net:8883 was established successfully" rule=rule1
......
Copy the code
-
Start Azure iot Hub monitoring by using the az iot Hub monitors-events -n Rockydemo command and send simulated data to the local EMQ X Edge like debugging SQL statements. After Kuiper processing, the corresponding processing results are sent to the Azure IoT Hub.
#az iot hub monitor-events -n rockydemoStarting event monitor, use ctrl-c to stop... { "event": { "origin": "demo_001", "payload": "[{\"device_id\":\"2\",\"t_av\":32,\"t_count\":3,\"t_max\":45,\"t_min\":20},{\"device_id\":\"1\",\"t_av\":45,\"t_count\" :3,\"t_max\":80,\"t_min\":20}]" } } { "event": { "origin": "demo_001", "payload": "[{\" device_id \ ": \" 2 \ ", \ "t_av \" : 33.5, \ "t_count \" : 2, \ "t_max \" : 55, \ "t_min \" : 12}, {\ "device_id \" : \ "1 \", \ "t_av \" : 37.5, \ "t_cou nt\":2,\"t_max\":65,\"t_min\":10}]" } }Copy the code
conclusion
Through this article, readers can understand that the solution of EMQ X at the edge can be very fast and flexible to develop a system based on edge data analysis, and realize low delay, low cost and safe processing of data. Azure IoT also provides the IoT Edge solution. Compared to Azure’s solution,
- Kuiper’s runtime is very lightweight; The Azure IoT Edge solution needs to provide runtime in relevant languages, and the installation package can be relatively large.
- Kuiper based on SQL implementation of business logic is more rapid and simple, the complex business logic processing lack of flexibility; Azure IoT Edge is relatively flexible in terms of business implementation.
- Kuiper is more flexible when integrating with third-party IoT Hubs. The Azure IoT Edge connects to the Azure IoT Hub only.
If you are interested in learning more about edge streaming data analysis, please refer to the Kuiper open source project.
For more information, please visit our official website emqx. IO, or follow our open source project github.com/emqx/emqx. For more details, please visit our official documentation.