EMQ – Million level Open Source MQTT messaging Server
EMQ (Erlang/Enterprise/Elastic MQTT Broker) is based on the Erlang/OTP platform development of open source iot MQTT message server. Erlang/OTP is an excellent soft-realtime, low-latency, Distributed language platform. MQTT is a Lightweight, PubSub Internet of Things messaging protocol.
The design objective of EMQ project is to carry mass MQTT connections of mobile terminals or iot terminals, and to realize fast and low delay message routing among mass iot devices:
- Stably hosts large-scale MQTT client connections, with a single server node supporting 500,000 to 1 million connections.
- Distributed node cluster, fast and low latency message routing, single cluster supports 10 million routes.
- Message server extension supports customized authentication and efficient storage of messages to back-end databases.
- Full iot protocol support, MQTT, MQTT-SN, CoAP, WebSocket or proprietary protocol support.
The choice of MQTT SDK comes in a variety of categories
The following describes two MQTTKit and MQTT-client-framework, both of which are OC using Swift version, refer to CocoaMQTT1, ** MQTTKit ** has not been updated but has no problem in basic use
pod ‘MQTTKit’
The header file
#import <MQTTKit.h> #define WEAKSELF __typeof(&*self) __weak weakSelf = self; @property (nonatomic, strong) MQTTClient *client;
Initializing links
WEAKSELF
NSString *clientID = @"Test client - must be globally unique ID";
MQTTClient *client = [[MQTTClient alloc] initWithClientId:StrFormat(@"% @", clientID)];
client.username = @"username";
client.password = @"password";
client.cleanSession = false; client.keepAlive = 20; client.port = 11883; Self. client = client; MQTT [client connectToHost:@"URL of linked MQTT" completionHandler:^(MQTTConnectionReturnCode code) {
if (code == ConnectionAccepted) {
NSLog(@"Link MQTT successful 😁😁😁"); Weakself. client subscribe:@"Topics you need to subscribe to" withQos:AtLeastOnce completionHandler:^(NSArray *grantedQos) {
DLog(@"Subscribe return %@",grantedQos);
}];
}else if (code == ConnectionRefusedBadUserNameOrPassword){
NSLog(@"MQTT account or verification code error");
} else if (code == ConnectionRefusedUnacceptableProtocolVersion){
NSLog(@"Unacceptable agreement by MQTT");
}else if (code == ConnectionRefusedIdentiferRejected){
NSLog(@"MQTT does not endorse");
}else if (code == ConnectionRefusedServerUnavailable){
NSLog(@"MQTT rejects link");
}else {
NSLog(@"MQTT is not authorized"); }}]; MessageHandler = ^(MQTTMessage *message) {NSString *jsonStr = [[NSString alloc] initWithData:message.payload encoding:NSUTF8StringEncoding]; NSLog(@"EasyMqttService mqtt connect success %@",jsonStr);
};
Copy the code
Subscribe to the topic
// Subscribe topic [self.client subscribe:@"Topics you need to subscribe to" withQos:AtMostOnce completionHandler:^(NSArray *grantedQos) {
NSLog(@"Subscribe return %@",grantedQos);
}];
}
Copy the code
Close the MQTTKit
-(void)closeMQTTClient{
WEAKSELF
[self.client disconnectWithCompletionHandler:^(NSUInteger code) {
// The client is disconnected when this completion handler is called
NSLog(@"MQTT client is disconnected");
[weakSelf.client unsubscribe:@"Subscribed topics" withCompletionHandler:^{
NSLog(@"Unsubscribe");
}];
}];
}
Copy the code
Send a message
[self.client publishString:postMsg toTopic:@"The subject of the message sent depends on the server." withQos:AtLeastOnce retain:NO completionHandler:^(int mid) {
if(cmd ! = METHOD_SOCKET_CHAT_TO) { NSLog(@"Send message return %d",mid); }}];Copy the code
2, ** MQTTClient ** MQTTClient configuration is more sustainable update, can configure SSL
Connect pod ‘MQTTClient/MinL’ pod ‘MQTTClient/ManagerL’ pod ‘MQTTClient/WebsocketL’
The header file
#import < mqTTWebSocketTransport.h > #define WEAKSELF __typeof(&*self) #import < mqTTWebSocketTransport.h > #define WEAKSELF __typeof(&*self) __weak weakSelf = self; @property (nonatomic, strong) MQTTSession *mySession; Need to add the protocol header < MQTTSessionDelegate MQTTSessionManagerDelegate >
Initializing links
The basic use
#import "MQTTClient.h"
\@interface MyDelegate : ... <MQTTSessionDelegate>
...
MQTTCFSocketTransport *transport = [[MQTTCFSocketTransport alloc] init];
transport.host = @"localhost";
transport.port = 1883;
MQTTSession *session = [[MQTTSession alloc] init];
session.transport = transport;
session.delegate = self;
[session connectAndWaitTimeout:30]; //this is part of the synchronous API
Copy the code
connection
WEAKSELF
NSString *clientID = @"Test client - must be globally unique ID";
MQTTWebsocketTransport *transport = [[MQTTWebsocketTransport alloc] init];
transport.host = @"Connect MQTT address"; transport.port = 8083; // Port number transport. TLS = YES; MQTTSession *session = [[MQTTSession alloc] init]; NSString *linkUserName = @"username";
NSString *linkPassWord = @"password";
[session setUserName:linkUserName];
[session setClientId:clientID];
[session setPassword:linkPassWord];
[session setKeepAliveInterval:5];
session.transport = transport;
session.delegate = self;
self.mySession = session;
[self reconnect];
[self.mySession addObserver:self forKeyPath:@"state"options:NSKeyValueObservingOptionNew|NSKeyValueObservingOptionOld context:nil]; // Add event listenerCopy the code
websocket
Listen to response events
- (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(NSDictionary *)change context:(void *)context {
switch (self.mySession.status) {
case MQTTSessionManagerStateClosed:
NSLog(@"Connection closed");
break;
case MQTTSessionManagerStateClosing:
NSLog(@"Connection closing");
break;
case MQTTSessionManagerStateConnected:
NSLog(@"Already connected");
break;
case MQTTSessionManagerStateConnecting:
NSLog(@"Connecting");
break;
case MQTTSessionManagerStateError: {
// NSString *errorCode = self.mySession.lastErrorCode.localizedDescription;
NSString *errorCode = self.mySession.description;
NSLog(@"Abnormal connection ----- %@",errorCode);
}
break;
case MQTTSessionManagerStateStarting:
NSLog(@"Start connecting");
break;
default:
break; }}Copy the code
The session Delegate agreement
Connection return state
-(void)handleEvent:(MQTTSession *)session event:(MQTTSessionEvent)eventCode error:(NSError *)error{
if (eventCode == MQTTSessionEventConnected) {
NSLog(@"2222222 linked MQTT successful");
}else if (eventCode == MQTTSessionEventConnectionRefused) {
NSLog(@"MQTT rejects link");
}else if (eventCode == MQTTSessionEventConnectionClosed){
NSLog(@"MQTT link closed");
}else if (eventCode == MQTTSessionEventConnectionError){
NSLog(@"MQTT link error");
}else if (eventCode == MQTTSessionEventProtocolError){
NSLog(@"Unacceptable agreement by MQTT");
}else{//MQTTSessionEventConnectionClosedByBroker
NSLog(@"Other MQTT link error");
}
if (error) {
NSLog(@"Link error -- %@",error); }}Copy the code
The sent message was received. Procedure
-(void)newMessage:(MQTTSession *)session data:(NSData *)data onTopic:(NSString *)topic qos:(MQTTQosLevel)qos retained:(BOOL)retained mid:(unsigned int)mid
{
NSDictionary *dic = [NSJSONSerialization JSONObjectWithData:data options:NSJSONReadingMutableContainers error:nil];
NSLog(@"EasyMqttService mqtt connect success %@",dic); // do the corresponding operation}Copy the code
Subscribe to the topic
The basic use
// Method encapsulation can be called externally [Session subscribeToTopic:@]"example/#" atLevel:2 subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss){
if (error) {
NSLog(@"Subscription failed %@", error.localizedDescription);
} else {
NSLog(@"Subscription sucessfull! Granted Qos: %@", gQoss); }}]; // this is part of the block APICopy the code
websocket
- (void)subscibeToTopicAction {
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
dispatch_async(dispatch_get_main_queue(), ^{
[self subscibeToTopic:@"Topics you want to subscribe to"];
});
});
}
-(void)subscibeToTopic:(NSString *)topicUrl
{
// self.manager.subscriptions = [NSDictionary dictionaryWithObject:[NSNumber numberWithInt:MQTTQosLevelAtMostOnce] forKey:topicUrl];
[self.mySession subscribeToTopic:topicUrl atLevel:MQTTQosLevelAtMostOnce subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss) {
if (error) {
NSLog(@"Failed to subscribe %@ %@",topicUrl,error);
}else
{
NSLog(@"Subscribe %@ successful g1oss %@",topicUrl,gQoss); Dispatch_async (dispatch_get_main_queue(), ^{// operation}); }; }]; }Copy the code
Close the MQTT Client
-(void)closeMQTTClient{
[self.mySession disconnect];
[self.mySession unsubscribeTopics:@[@"Subscribed topics"] unsubscribeHandler:^(NSError *error) {
if (error) {
DLog(@"Unsubscribe failed");
}else{
DLog(@"Unsubscribe succeeded"); }}]; }Copy the code
Send a message
[self.mySession publishData:jsonData onTopic:@"Topic server definition for sending messages" retain:NO qos:MQTTQosLevelAtMostOnce publishHandler:^(NSError *error) {
if (error) {
NSLog(@"Send failed - %@",error);
}else{
NSLog(@"Sent successfully"); }}];Copy the code