EMQ – Million level Open Source MQTT messaging Server

EMQ (Erlang/Enterprise/Elastic MQTT Broker) is based on the Erlang/OTP platform development of open source iot MQTT message server. Erlang/OTP is an excellent soft-realtime, low-latency, Distributed language platform. MQTT is a Lightweight, PubSub Internet of Things messaging protocol.

The design objective of EMQ project is to carry mass MQTT connections of mobile terminals or iot terminals, and to realize fast and low delay message routing among mass iot devices:

  1. Stably hosts large-scale MQTT client connections, with a single server node supporting 500,000 to 1 million connections.
  2. Distributed node cluster, fast and low latency message routing, single cluster supports 10 million routes.
  3. Message server extension supports customized authentication and efficient storage of messages to back-end databases.
  4. Full iot protocol support, MQTT, MQTT-SN, CoAP, WebSocket or proprietary protocol support.

The choice of MQTT SDK comes in a variety of categories

The following describes two MQTTKit and MQTT-client-framework, both of which are OC using Swift version, refer to CocoaMQTT1, ** MQTTKit ** has not been updated but has no problem in basic use

pod ‘MQTTKit’

The header file

#import <MQTTKit.h> #define WEAKSELF __typeof(&*self) __weak weakSelf = self; @property (nonatomic, strong) MQTTClient *client;

Initializing links

	WEAKSELF
    NSString *clientID = @"Test client - must be globally unique ID";
    MQTTClient *client = [[MQTTClient alloc] initWithClientId:StrFormat(@"% @", clientID)];
    client.username = @"username";
    client.password = @"password";
    client.cleanSession = false; client.keepAlive = 20; client.port = 11883; Self. client = client; MQTT [client connectToHost:@"URL of linked MQTT" completionHandler:^(MQTTConnectionReturnCode code) {
        if (code == ConnectionAccepted) {
            NSLog(@"Link MQTT successful 😁😁😁"); Weakself. client subscribe:@"Topics you need to subscribe to" withQos:AtLeastOnce completionHandler:^(NSArray *grantedQos) {
            DLog(@"Subscribe return %@",grantedQos);
        }];
        }else if (code == ConnectionRefusedBadUserNameOrPassword){
            NSLog(@"MQTT account or verification code error");
        } else if (code == ConnectionRefusedUnacceptableProtocolVersion){
            NSLog(@"Unacceptable agreement by MQTT");
        }else if (code == ConnectionRefusedIdentiferRejected){
            NSLog(@"MQTT does not endorse");
        }else if (code == ConnectionRefusedServerUnavailable){
            NSLog(@"MQTT rejects link");
        }else {
            NSLog(@"MQTT is not authorized"); }}]; MessageHandler = ^(MQTTMessage *message) {NSString *jsonStr = [[NSString alloc] initWithData:message.payload encoding:NSUTF8StringEncoding]; NSLog(@"EasyMqttService mqtt connect success %@",jsonStr);
    };
Copy the code

Subscribe to the topic

// Subscribe topic [self.client subscribe:@"Topics you need to subscribe to" withQos:AtMostOnce completionHandler:^(NSArray *grantedQos) {
        NSLog(@"Subscribe return %@",grantedQos);
    }];
}
Copy the code

Close the MQTTKit

-(void)closeMQTTClient{
    WEAKSELF
    [self.client disconnectWithCompletionHandler:^(NSUInteger code) {
        // The client is disconnected when this completion handler is called
        NSLog(@"MQTT client is disconnected");
        [weakSelf.client unsubscribe:@"Subscribed topics" withCompletionHandler:^{
            NSLog(@"Unsubscribe");
        }];
    }];
}
Copy the code

Send a message

   [self.client publishString:postMsg toTopic:@"The subject of the message sent depends on the server."  withQos:AtLeastOnce retain:NO completionHandler:^(int mid) {
        if(cmd ! = METHOD_SOCKET_CHAT_TO) { NSLog(@"Send message return %d",mid); }}];Copy the code

2, ** MQTTClient ** MQTTClient configuration is more sustainable update, can configure SSL

Connect pod ‘MQTTClient/MinL’ pod ‘MQTTClient/ManagerL’ pod ‘MQTTClient/WebsocketL’

The header file

#import < mqTTWebSocketTransport.h > #define WEAKSELF __typeof(&*self) #import < mqTTWebSocketTransport.h > #define WEAKSELF __typeof(&*self) __weak weakSelf = self; @property (nonatomic, strong) MQTTSession *mySession; Need to add the protocol header < MQTTSessionDelegate MQTTSessionManagerDelegate >

Initializing links

The basic use


#import "MQTTClient.h"
\@interface MyDelegate : ... <MQTTSessionDelegate>
...
        MQTTCFSocketTransport *transport = [[MQTTCFSocketTransport alloc] init];
        transport.host = @"localhost";
        transport.port = 1883;
        MQTTSession *session = [[MQTTSession alloc] init];
        session.transport = transport;
    	session.delegate = self;
    	[session connectAndWaitTimeout:30];  //this is part of the synchronous API
Copy the code

connection

	WEAKSELF
    NSString *clientID = @"Test client - must be globally unique ID";
    MQTTWebsocketTransport *transport = [[MQTTWebsocketTransport alloc] init];
    transport.host = @"Connect MQTT address"; transport.port = 8083; // Port number transport. TLS = YES; MQTTSession *session = [[MQTTSession alloc] init]; NSString *linkUserName = @"username";
    NSString *linkPassWord = @"password";
    [session setUserName:linkUserName];
    [session setClientId:clientID];
    [session setPassword:linkPassWord];
    [session setKeepAliveInterval:5];
    session.transport = transport;
    session.delegate = self;
    self.mySession = session;
    [self reconnect];
    [self.mySession addObserver:self forKeyPath:@"state"options:NSKeyValueObservingOptionNew|NSKeyValueObservingOptionOld context:nil]; // Add event listenerCopy the code

websocketListen to response events

- (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(NSDictionary *)change context:(void *)context {
    switch (self.mySession.status) {
        case MQTTSessionManagerStateClosed:
        NSLog(@"Connection closed");
        break;
        case MQTTSessionManagerStateClosing:
        NSLog(@"Connection closing");
        break;
        case MQTTSessionManagerStateConnected:
        NSLog(@"Already connected");
        break;
        case MQTTSessionManagerStateConnecting:
        NSLog(@"Connecting");
        
        break;
        case MQTTSessionManagerStateError: {
            //            NSString *errorCode = self.mySession.lastErrorCode.localizedDescription;
            NSString *errorCode = self.mySession.description;
            NSLog(@"Abnormal connection ----- %@",errorCode);
        }
        break;
        case MQTTSessionManagerStateStarting:
        NSLog(@"Start connecting");
        break;
        default:
        break; }}Copy the code

The session Delegate agreement

Connection return state

-(void)handleEvent:(MQTTSession *)session event:(MQTTSessionEvent)eventCode error:(NSError *)error{
    if (eventCode == MQTTSessionEventConnected) {
        NSLog(@"2222222 linked MQTT successful");
    }else if (eventCode == MQTTSessionEventConnectionRefused) {
            NSLog(@"MQTT rejects link");
   }else if (eventCode == MQTTSessionEventConnectionClosed){
            NSLog(@"MQTT link closed");
  }else if (eventCode == MQTTSessionEventConnectionError){
            NSLog(@"MQTT link error");
  }else if (eventCode == MQTTSessionEventProtocolError){
            NSLog(@"Unacceptable agreement by MQTT");
  }else{//MQTTSessionEventConnectionClosedByBroker
            NSLog(@"Other MQTT link error");
  }
   if (error) {
        NSLog(@"Link error -- %@",error); }}Copy the code

The sent message was received. Procedure

-(void)newMessage:(MQTTSession *)session data:(NSData *)data onTopic:(NSString *)topic qos:(MQTTQosLevel)qos retained:(BOOL)retained mid:(unsigned int)mid
{
    NSDictionary *dic = [NSJSONSerialization JSONObjectWithData:data options:NSJSONReadingMutableContainers error:nil];
    NSLog(@"EasyMqttService mqtt connect success %@",dic); // do the corresponding operation}Copy the code

Subscribe to the topic

The basic use

// Method encapsulation can be called externally [Session subscribeToTopic:@]"example/#" atLevel:2 subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss){
    if (error) {
        NSLog(@"Subscription failed %@", error.localizedDescription);
    } else {
        NSLog(@"Subscription sucessfull! Granted Qos: %@", gQoss); }}]; // this is part of the block APICopy the code

websocket

- (void)subscibeToTopicAction {
    dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
        dispatch_async(dispatch_get_main_queue(), ^{
            [self subscibeToTopic:@"Topics you want to subscribe to"];
            
        });
    });
}

-(void)subscibeToTopic:(NSString *)topicUrl
{
//    self.manager.subscriptions = [NSDictionary dictionaryWithObject:[NSNumber numberWithInt:MQTTQosLevelAtMostOnce] forKey:topicUrl];
    [self.mySession subscribeToTopic:topicUrl atLevel:MQTTQosLevelAtMostOnce subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss) {
        if (error) {
            NSLog(@"Failed to subscribe %@ %@",topicUrl,error);
        }else
        {
            NSLog(@"Subscribe %@ successful g1oss %@",topicUrl,gQoss); Dispatch_async (dispatch_get_main_queue(), ^{// operation}); }; }]; }Copy the code

Close the MQTT Client

-(void)closeMQTTClient{
    [self.mySession disconnect];
    [self.mySession unsubscribeTopics:@[@"Subscribed topics"] unsubscribeHandler:^(NSError *error) {
        if (error) {
            DLog(@"Unsubscribe failed");
        }else{
            DLog(@"Unsubscribe succeeded"); }}]; }Copy the code

Send a message

    [self.mySession publishData:jsonData onTopic:@"Topic server definition for sending messages" retain:NO qos:MQTTQosLevelAtMostOnce publishHandler:^(NSError *error) {
        if (error) {
            NSLog(@"Send failed - %@",error);
        }else{
            NSLog(@"Sent successfully"); }}];Copy the code