Learn the MQTT protocol and emQTTD open source project emqtt.com/

The source version of EMQTTD is V1.1.3. Emqtt.com/downloads/1…

\

The source code is written around, need to go through the following module call, entry is emQTTD_client module, handle_INFO function is responsible for receiving socket data:

Note: gen_server:call is synchronous, cast is asynchronous. Call is handLE_INFO and cast is handLE_cast.) \

\

1, the module (emqttd_client).

handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
received(Bytes, State = #client_state{parser_fun  = ParserFun,
                                      packet_opts = PacketOpts,
                                      proto_state = ProtoState}) ->
Copy the code

\

2, – the module (emqttd_parser).

parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
Copy the code

\

3, – the module (emqttd_protocol).

Parsing PUBLISH messages

received(Packet = ? PACKET(_Type), State) -> process(Packet = ? PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) -> publish(Packet = ? PUBLISH_PACKET(? QOS_0, _PacketId), #proto_state{client_id = ClientId, username = Username, session = Session}) ->Copy the code

\

4, – the module (emqttd_session).

publish(_SessPid, Msg = #mqtt_message{qos = ? QOS_0}) ->Copy the code

\

5, – the module (emqttd).

publish(Msg) when is_record(Msg, mqtt_message) ->
Copy the code

\

6, – the module (emqttd_server).

publish(Msg = #mqtt_message{from = From}) ->
Copy the code

\

7, – the module (emqttd_pubsub).

Both local and cluster nodes are considered here

publish(Topic, Msg) -> lists:foreach( fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() -> ? MODULE:dispatch(To, Msg); (#mqtt_route{topic = To, node = Node}) -> rpc:cast(Node, ? MODULE, dispatch, [To, Msg]) end, emqttd_router:lookup(Topic)).Copy the code

\

dispatch(Topic, Msg) ->
SubPid ! {dispatch, Topic, Msg};
Copy the code

\

8, – the module (emqttd_session).

handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions})
    when is_record(Msg, mqtt_message) ->
Copy the code

If the Client is online and offline, and CleanSession=0/QoS=1,2, the offline message needs to be stored. \

%% Queue message if client disconnected dispatch(Msg, Session = #session{client_pid = undefined, message_queue = Q}) -> hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}); %% Deliver qos0 message directly to client dispatch(Msg = #mqtt_message{qos = ? QOS0}, Session = #session{client_pid = ClientPid}) -> ClientPid ! {deliver, Msg}, hibernate(Session); dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ}) when QoS =:= ? QOS1 orelse QoS =:= ? QOS2 -> case check_inflight(Session) of true -> noreply(deliver(Msg, Session)); false -> hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) end.Copy the code

\

9, – the module (emqttd_client).

handle_info({deliver, Message}, State) ->
    with_proto_state(fun(ProtoState) ->
                       emqttd_protocol:send(Message, ProtoState)
                     end, State);
Copy the code

\

10, – the module (emqttd_protocol).

send(Packet, State = #proto_state{sendfun = SendFun})
    when is_record(Packet, mqtt_packet) ->
Copy the code

\