Introduction:

DolphinDB is a high-performance distributed time series database developed by Zhejiang Zhiyuo Technology Co., LTD. It combines distributed storage, distributed computing, streaming computing and programming languages to provide customers with a lightweight, one-stop big data solution, especially suitable for quantitative finance and industrial Internet of Things. DolphinDB provides multiple API access methods and supports multiple data injection methods such as MQTT, OPC, and KAFKA. As DolphinDB developed, more manufacturers started working with DolphinDB in depth.

Hangzhou Yingyun Technology Co., Ltd. is a leading enterprise in the field of message and flow processing for 5G and Internet of Things market. Recently, the technical team of Yingyun technology developed the DolphinDB API independently, using its own Erlang programming language and DolphinDB data access protocol. DolphinDB can be two-way data transmission with TCP. DolphinDB provides a new way for developers and users to interact with DolphinDB.

EMQ X (EMQ for short) is a fully open source, highly scalable, highly available distributed MQTT messaging server that also supports CoAP/LwM2M one-stop IoT protocol access. EMQ is the messaging engine for the Internet of Everything in the 5G era. It is suitable for IoT, M2M and mobile applications and can handle tens of millions of concurrent clients.

# Build and configure DolphinDB

Currently, the EMQ X only works with DolphinDB versions 1.20.7 and above.

Linux version, for example, to’s official website to download the latest version of the community on the installation package: www.dolphindb.cn/downloads.h…

Upload the server directory of the installation package to the server directory /opts/app/ Dolphindb and check whether the package starts properly.

Chmod +x./ Dolphindb./ Dolphindb ## When the command is successfully started, the system goes to the dolphindb command line and executes 1+1 >1+1 2Copy the code

DolphinDB is successfully installed if DolphinDB is successfully started and output is correct. Then close DolphinDB with

.
+d>

Now we need to open DolphinDB’s StreamTable publish/subscribe and create tables to store and persist EMQ X messages:

  1. Modify the DolphinDB configuration file vim Dolphindb. CFG to add the following configuration items to enable the publish/subscribe function:

    Publisher for streaming

    maxPubConnections=10 persistenceDir=/ddb/pubdata/ #persistenceWorkerNum= #maxPersistenceQueueDepth= #maxMsgNumPerBlock= #maxPubQueueDepthPerSite=

    Subscriber for streaming

    subPort=8000 #subExecutors= #maxSubConnections= #subExecutorPooling= #maxSubQueueDepth=

2. Start DolphinDB service in the background:

DolphinDB listens for port 8848 when the client starts. nohup ./dolphindb -console 0 &Copy the code

3. Go to the DolphinDB website and download the appropriate GUI client to connect to the DolphinDB service:

  • To travel toDownload page (Opens New Window)downloadDolphinDB GUI
  • DolphinDB GUI clients rely on the Java environment. Make sure Java is installed
  • Go to the DolphinDB GUI directory and execute itsh gui.shStart the client
  • Add the Server to the client and create a Project and script file.

Create distributed database and StreamTable table; And persist StreamTable data to a distributed table:

// Create a distributed file database named emqx // create a table named 'MSG', partition according to the HASH value of 'clientid' and 'topic' :  schema = table(1:0, `clientid`topic`qos`payload, [STRING, STRING, INT, STRING]) db1 = database("", HASH, [STRING, 8]) db2 = database("", HASH, [STRING, 8]) db = database("dfs://emqx", COMPO, [db1, Db2]) db. CreatePartitionedTable (schema, "MSG", ` clientid ` topic) / / create a ` st_msg ` StreamTable list, and the persistence of data to a ` MSG ` table. share streamTable(10000:0,`clientid`topic`qos`payload, [STRING,STRING,INT,STRING]) as st_msg msg_ref= loadTable("dfs://emqx", "msg") subscribeTable(, "st_msg", "Save_msg_to_dfs ", 0, msg_ref, true) Select * from msg_ref;Copy the code

When done, you can see that an empty MSG_ref has been created successfully:

DolphinDB configuration is complete.

For detailed DolphinDB documentation, see:

  • User Guide: github.com/dolphindb/T…
  • IoT scenario example: gitee.com/dolphindb/T…
  • Flow handling Guide: github.com/dolphindb/T…
  • Manual programming: www.dolphindb.cn/cn/help/ind…

Configure the rules engine

Create a rule:

Open the EMQ X Dashboard (Opens New Window) and select the “Rules” TAB on the left.

SQL:

SELECT * FROM "t/#"
Copy the code

Associated actions:

On the Response Actions screen, select Add. In the Actions drop-down list, select Save data to DolphinDB.

Fill in action parameters:

The save data to DolphinDB action takes two parameters:

1). SQL template. SQL > insert into table st_msg;

insert into st_msg values(${clientid}, ${topic}, ${qos}, ${payload})
Copy the code

2). ID of the associated resource. A DolphinDB resource can now be created by clicking “New Resource” in the upper right corner.

Fill in the resource configuration:

Server Address Indicates the DolphinDB server deployed above. The user name is admin and the password is 123456

Click the ok button.

Return to the response action screen and click OK.

Return to the rule creation page and click Create.

In the rule list, click the “View” button or the rule ID link to preview the rule you just created:

Now that the rule has been created, send a message:

Topic: "t/a"
QoS: 1
Payload: "hello"
Copy the code

Then check the persistent MSG_dFS table to see if the new data was added successfully: