A list,
Mica-mqtt is a simple, low latency, high performance MQTT Iot open source component based on t-IO implementation. See the mica-MQtt-example module for details.
Second, the function of
- Supports MQTT V3.1, V3.1.1 and V5.0 protocols.
- Support MQTT Client client.
- Supports MQTT Server server.
- Support MQTT will message.
- MQTT retention messages are supported.
- Support for custom message (MQ) processing forwarding implementation clusters.
- MQTT client Aliyun MQTT connection to Demo.
- Support quick access to Spring Boot projects (MICA-MQTT-spring-boot-starter).
Third, to do
- Added Websocket support (preresearched successfully).
- Optimized for MQTT session handling and support for V5.0
Update records
subscription management is integrated into session management.
MqttProperties. MqttPropertyType add comments, consider the MQTT V5.0 new features.
Add the Spring Boot starter to facilitate access and support Spring Boot of earlier versions.
Investigate the T-IO Websocket sub-protocol.
fixes some problems with Java 8 runtime, NoSuchMethodError: java.nio.bytebuffer.xxx
Fast access to Spring Boot
5.1 Adding a Dependency
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-spring-boot-starter</artifactId>
<version>1.0.1</version>
</dependency>
Copy the code
5.2 configuration items
Configuration items | The default value | instructions |
---|---|---|
mqtt.server.name | Mica-Mqtt-Server | The name of the |
mqtt.server.port | 1883 | port |
mqtt.server.ip | 127.0.0.1 | The server IP |
mqtt.server.buffer-allocator | Heap memory | Heap memory and off-heap memory |
mqtt.server.heartbeat-timeout | 120s | Heartbeat timeout (ms: default: 1000 x 120). If the user does not want the framework layer to do heartbeats, set this value to 0 or negative |
mqtt.server.read-buffer-size | 8092 | Buffer size for receiving data. Default: 8092 |
mqtt.server.max-bytes-in-message | 8092 | Maximum length of bytes for message parsing. Default: 8092 |
mqtt.server.debug | false | debug |
5.3 Implementable Interface (Register as Spring Bean)
interface | Whether must | instructions |
---|---|---|
IMqttServerAuthHandler | is | Used for client authentication |
IMqttMessageListener | is | News listening |
IMqttConnectStatusListener | is | Connection status monitor |
IMqttSessionManager | no | Session management |
IMqttMessageStore | Cluster yes, single-node no | Wills and retention message stores |
IMqttMessageDispatcher | Cluster yes, single-node no | forward |
IpStatListener | no | T-io IP status monitoring |
5.4 (Optional) Custom Configuration
@Configuration(proxyBeanMethods = false)
public class MqttServerCustomizerConfiguration {
@Bean
public MqttServerCustomizer activeRecordPluginCustomizer(a) {
return new MqttServerCustomizer() {
@Override
public void customize(MqttServerCreator creator) {
// You can customize the configuration creator here, which overwrites the configuration in YML
System.out.println("----------------MqttServerCustomizer-----------------"); }}; }}Copy the code
Common Java project access
6.1 the maven rely on
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-core</artifactId>
<version>1.0.1</version>
</dependency>
Copy the code
6.2 MicA-MQTT client
// Initialize MQTT client
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883) // Default: 1883
.username("admin")
.password("123456")
.version(MqttVersion.MQTT_5) // Default: 3_1_1
.clientId("xxxxxx") // Default: the mica-mqtt - prefix and nanoseconds in base 36
.connect(); / / the connection
// Message subscription, similar method subxxx
client.subQos0("/test/#", (topic, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
// Unsubscribe
client.unSubscribe("/test/#");
// Send a message
client.publish("/test/client", ByteBuffer.wrap("Mica's best skin.".getBytes(StandardCharsets.UTF_8)));
// Disconnect the connection
client.disconnect();
/ / reconnection
client.reconnect();
/ / stop
client.stop();
Copy the code
6.3 MicA-MQTT Server
// Note: Add the JVM parameter -xss129k in order to accept more links (lower memory)
MqttServer mqttServer = MqttServer.create()
// Default: 127.0.0.1
.ip("127.0.0.1")
// Default: 1883
.port(1883)
// Default: 8092 (MQTT default maximum message size), can be reduced to reduce memory, if the message is too large t-IO will try to parse multiple times (recommended according to the actual service situation)
.readBufferSize(512)
// Custom authentication
.authHandler((clientId, userName, password) -> true)
// Message listener
.messageListener((clientId, topic, mqttQoS, payload) -> {
logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
})
/ / the SSL configuration
.useSsl("".""."")
// Customize client up-down and down-line listening
.connectStatusListener(new IMqttConnectStatusListener() {
@Override
public void online(String clientId) {}@Override
public void offline(String clientId) {}})// Custom message forwarding, which can be clustered using MQ broadcast
.messageDispatcher(new IMqttMessageDispatcher() {
@Override
public void config(MqttServer mqttServer) {}@Override
public boolean send(Message message) {
return false;
}
@Override
public boolean send(String clientId, Message message) {
return false;
}
})
.debug() // Enable t-IO debug information logs
.start();
// Send to a client
mqttServer.publish("clientId"."/test/123", ByteBuffer.wrap("Mica's best skin.".getBytes()), MqttQoS.EXACTLY_ONCE);
// send to all clients listening to this topic online
mqttServer.publishAll("/test/123", ByteBuffer.wrap("Mica's best skin.".getBytes()), MqttQoS.EXACTLY_ONCE);
// Stop the service
mqttServer.stop();
Copy the code
7. Effect demonstration
Viii. Relevant documents
- Mica-mqtt-spring-boot-starter uses documentation
- Mica-mqtt uses documentation
- Mica-mqtt release
- T-io official document
- MQTT protocol documents