preface

Recently, Internet of Things related technology is very hot, there is a project involving related application fields, research found that EMQX and TDengine are involved in the Internet of Things, and both provide open source versions, but later found that only EMQX enterprise edition can support automatic data to TDengine. But the open source version of this feature has been neutered. So we did the SET project, a Java project based on Springboot. The main function is to subscribe to Emqx messages and write TDengine framework, which is made use of the features of the open source version and can be used with confidence.

Basic architecture

Below is a dry-only architecture.

The core code

The code structure



dependency


      
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1. RELEASE</version>
        <relativePath/> <! -- lookup parent from repository -->
    </parent>
    <groupId>com.zew</groupId>
    <artifactId>iot-emqx-tdengine</artifactId>
    <version>0.0.1 - the SNAPSHOT</version>

    <name>spring-boot-demo</name>
    <description>Demo project for using TDEngine with Spring Boot And EMQX</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <! -- taos-jdbc -->
        <dependency>
            <groupId>com.taosdata.jdbc</groupId>
            <artifactId>taos-jdbcdriver</artifactId>
            <version>Mid-atlantic moved</version>
        </dependency>

        <! -- druid connection pool -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.17</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>
        <! -- swagger -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.swagger</groupId>
            <artifactId>swagger-annotations</artifactId>
            <version>1.5.22</version>
        </dependency>
        <dependency>
            <groupId>io.swagger</groupId>
            <artifactId>swagger-models</artifactId>
            <version>1.5.22</version>
        </dependency>

        <! -- Slf4j -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>true</filtering>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
            </resource>

        </resources>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
Copy the code

Model layer

package com.zew.demo.iot.model;

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.sql.Timestamp;

/ * * *@author Administrator
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Weather {

    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS", timezone = "GMT+8")
    private Timestamp ts;

    private int temperature;

    private float humidity;

}
Copy the code

The Service layer

EMQX listener module

package com.zew.demo.iot.service;

import com.alibaba.fastjson.JSONObject;
import com.zew.demo.iot.model.Weather;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.net.InetAddress;
import java.net.UnknownHostException;

@Slf4j
@Service
public class MessageListenerService {
    @ Value (" ${broker. IP: 10.0.4.214} ")
    private String brokerIp;
    @Value("${subscribe.topic:/#}")
    private String topic;
    @Autowired
    private WeatherService weatherService;

    @PostConstruct
    public void ini(a) {
        try {
            String host = InetAddress.getLocalHost().getHostAddress();
            MqttClient client = new MqttClient("tcp://" + brokerIp + ": 1883", host + System.currentTimeMillis(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setConnectionTimeout(10);
            options.setKeepAliveInterval(15);
            options.setUserName("userName");
            options.setPassword("userName".toCharArray());
            client.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    log.warn("Disconnection", cause);
                }

                @Override
                public void messageArrived(String topic, MqttMessage message) {
                    log.debug("Message delivery {}, {}", topic, message);
                    byte[] payload = message.getPayload();
                    JSONObject jsonObject = JSONObject.parseObject(new String(payload));
                    Weather weather=new Weather();
                    weather.setTemperature(jsonObject.getInteger("temperature"));
                    weather.setHumidity(jsonObject.getFloat("humidity"));
                    weatherService.save(weather);
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    log.debug("Message complete {}", token); }}); client.connect(options); client.subscribe(topic,1);
        } catch (UnknownHostException | MqttException e) {
            log.error("MQTT error connecting to broker", e); }}}Copy the code

TDengine operation module

package com.zew.demo.iot.service;

import com.zew.demo.iot.mapper.DatabaseMapper;
import com.zew.demo.iot.mapper.WeatherMapper;
import com.zew.demo.iot.model.Weather;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/ * * *@author Administrator
 */
@Slf4j
@Service
public class WeatherService {

    private final DatabaseMapper databaseMapper;
    private final WeatherMapper weatherMapper;

    public WeatherService(DatabaseMapper databaseMapper, WeatherMapper weatherMapper) {
        this.databaseMapper = databaseMapper;
        this.weatherMapper = weatherMapper;
    }

    public boolean init(a) {
        try {
            / / delete
            databaseMapper.dropDatabase("db");
            / / create databaseMapper. CreateDatabase (" db ")
            Map<String, String> map = new HashMap<>(4);
            map.put("dbName"."db");
            map.put("keep"."36500");
            map.put("days"."30");
            map.put("blocks"."4");
            databaseMapper.creatDatabaseWithParameters(map);
            / / select
            databaseMapper.useDatabase("db");
            / / create a table
            weatherMapper.createTable("db"."weather");
            return true;
        } catch (Exception e) {
            log.error("Error creating data table", e);
        }
        return false;
    }

    public int save(Weather weather) {
        return weatherMapper.insert(weather);
    }

    public int batchSave(List<Weather> weatherList) {
        return weatherMapper.batchInsert(weatherList);
    }

    public List<Weather> query(Long limit, Long offset) {
        returnweatherMapper.select(limit, offset); }}Copy the code

DAO layer

package com.zew.demo.iot.mapper;

import com.zew.demo.iot.model.Weather;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Component;

import java.util.List;

/ * * *@author Administrator
 */
@Component
@Mapper
public interface WeatherMapper {

    /** * create table **@paramDbName Database name *@paramTableName table name * /
    void createTable(String dbName, String tableName);

    /** * Single insert data **@paramThe weather data *@return* /
    int insert(Weather weather);

    /** * Insert data in batches **@paramWeatherList Data list *@return* /
    int batchInsert(List<Weather> weatherList);

    /** * query data **@paramLimit the number of *@paramOffset the offset *@return* /
    List<Weather> select(@Param("limit") Long limit, @Param("offset") Long offset);
}
Copy the code

TDengine may involve frequent table operations, so table operations are encapsulated as well

package com.zew.demo.iot.mapper;

import org.apache.ibatis.annotations.Mapper;
import org.springframework.stereotype.Component;

import java.util.Map;

/ * * *@author Administrator
 */
@Component
@Mapper
public interface DatabaseMapper {

    /** * Create database **@paramDbName Database name *@return* /
    int createDatabase(String dbName);

    /** * Create database screen configuration parameters **@paramThe map parameters *@return* /
    int creatDatabaseWithParameters(Map<String, String> map);

    /** * Delete database **@paramDbName Database name *@return* /
    int dropDatabase(String dbName);

    /** * Select database **@paramDbName Database name *@return* /
    int useDatabase(String dbName);

}
Copy the code

XML corresponding to mapper


      
<! DOCTYPEmapper PUBLIC "- / / mybatis.org//DTD Mapper / 3.0 / EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">

<mapper namespace="com.zew.demo.iot.mapper.DatabaseMapper">

    <update id="createDatabase" parameterType="java.lang.String">
        create database if not exists ${dbName}
    </update>

    <update id="creatDatabaseWithParameters" parameterType="map">
        CREATE database if not EXISTS ${dbName}
        <if test="keep ! = null">
            KEEP ${keep}
        </if>
        <if test="days ! = null">
            DAYS ${days}
        </if>
        <if test="replica ! = null">
            REPLICA ${replica}
        </if>
        <if test="cache ! = null">
            cache ${cache}
        </if>
        <if test="blocks ! = null">
            blocks ${blocks}
        </if>
        <if test="minrows ! = null">
            minrows ${minrows}
        </if>
        <if test="maxrows ! = null">
            maxrows ${maxrows}
        </if>
    </update>

    <update id="dropDatabase" parameterType="java.lang.String">
        DROP database if exists ${dbName}
    </update>

    <update id="useDatabase" parameterType="java.lang.String">
        use ${dbName}
    </update>

</mapper>
Copy the code

      
<! DOCTYPEmapper PUBLIC "- / / mybatis.org//DTD Mapper / 3.0 / EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">

<mapper namespace="com.zew.demo.iot.mapper.WeatherMapper">

    <resultMap id="BaseResultMap" type="com.zew.demo.iot.model.Weather">
        <id column="ts" jdbcType="TIMESTAMP" property="ts"/>
        <result column="temperature" jdbcType="INTEGER" property="temperature"/>
        <result column="humidity" jdbcType="FLOAT" property="humidity"/>
    </resultMap>

    <update id="createTable">
        create table if not exists ${dbName}.${tableName}(ts timestamp, temperature int, humidity float);
    </update>

    <update id="dropTable" parameterType="java.lang.String">
        drop ${tableName}
    </update>

    <insert id="insert" parameterType="com.zew.demo.iot.model.Weather">
        insert into db.weather (ts,temperature, humidity) values (now, #{temperature,jdbcType=INTEGER}, #{humidity,jdbcType=FLOAT})
    </insert>

    <insert id="batchInsert" parameterType="java.util.List">
        insert into db.weather(ts,temperature, humidity) values
        <foreach separator="" collection="list" item="weather" index="index">
            (now + #{index}a, #{weather.temperature}, #{weather.humidity})
        </foreach>
    </insert>

    <sql id="Base_Column_List">
        ts, temperature, humidity
    </sql>

    <select id="select" resultMap="BaseResultMap">
        select
        <include refid="Base_Column_List"/>
        from db.weather
        order by ts desc
        <if test="limit ! = null">
            limit #{limit,jdbcType=BIGINT}
        </if>
        <if test="offset ! = null">
            offset #{offset,jdbcType=BIGINT}
        </if>
    </select>
</mapper>
Copy the code

Control layer

Mainly for testing purposes

package com.zew.demo.iot.ctrl;

import com.zew.demo.iot.model.Weather;
import com.zew.demo.iot.service.WeatherService;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.*;

import java.util.List;

/ * * *@author Administrator
 */
@RestController
@RequestMapping("/weather")
public class WeatherController {

    private final WeatherService weatherService;

    public WeatherController(WeatherService weatherService) {
        this.weatherService = weatherService;
    }

    @apiOperation (value = "initial database and table weather")
    @GetMapping("/init")
    public boolean init(a) {
        return weatherService.init();
    }

    @apiOperation (value = "insert data ")
    @PostMapping
    public int saveWeather(@RequestBody Weather weather) {
        return weatherService.save(weather);
    }

    @apiOperation (value = "insert multiple data ")
    @PostMapping("/batch")
    public int batchSaveWeather(@RequestBody List<Weather> weatherList) {
        return weatherService.batchSave(weatherList);
    }

    @apiOperation (value = "query data ")
    @GetMapping("/{limit}/{offset}")
    public List<Weather> queryWeather(@PathVariable Long limit, @PathVariable Long offset) {
        returnweatherService.query(limit, offset); }}Copy the code

other

Entrance class

package com.zew.demo.iot;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/ * * *@author Administrator
 */

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); }}Copy the code

swagger

package com.zew.demo.iot;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

/ * * *@author zy
 */
@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Bean
    public Docket createRestApi(a) {
        return new Docket(DocumentationType.SWAGGER_2).select()
                .apis(RequestHandlerSelectors.basePackage("com.zew.demo.iot"))
                .paths(PathSelectors.any()).build().apiInfo(apiInfo());
    }

    private ApiInfo apiInfo(a) {
        return new ApiInfoBuilder().title("IOT-API").description("IOT- Interface Design").build(); }}Copy the code

configuration

server.port=8080
server.servlet.context-path=/iot/services
# database configuration
spring.datasource.driver-class-name=com.taosdata.jdbc.TSDBDriver
spring.datasource.url=JDBC: TAOS: / / 127.0.0.1:6030 / log
spring.datasource.username=root
spring.datasource.password=taosdata
Druid connection pool configuration
spring.datasource.druid.initial-size=5
spring.datasource.druid.min-idle=5
spring.datasource.druid.max-active=5
spring.datasource.druid.max-wait=60000
spring.datasource.druid.validation-query=select server_status();
spring.datasource.druid.validation-query-timeout=5000
spring.datasource.druid.test-on-borrow=false
spring.datasource.druid.test-on-return=false
spring.datasource.druid.test-while-idle=true
spring.datasource.druid.time-between-eviction-runs-millis=60000
spring.datasource.druid.min-evictable-idle-time-millis=600000
spring.datasource.druid.max-evictable-idle-time-millis=900000
# mybatis
mybatis.mapper-locations=classpath:mapper/*.xml
# log
logging.level.com.zew.demo.iot.mapper=debug
Copy the code

conclusion

  1. The open source address is as follows: Portal is here
  2. EMQX version 4.2.2 and TDengine version 2.0.6 were used. By the way, TDengine integration is still a lot of holes, especially the new 2.0 FQDN, this issue for colleagues to talk about next time.
  3. To run projects under Windows, don’t forget to install special client oh, the link is here
  4. The project is just an example, it can run, optimization and customization still need their own efforts
  5. This project heavily references tdEngine’s SpirngDemo code