Learn the MQTT protocol and emQTTD open source project emqtt.com/
The source version of EMQTTD is V1.1.3. Emqtt.com/downloads/1…
\
The source code is written around, need to go through the following module call, entry is emQTTD_client module, handle_INFO function is responsible for receiving socket data:
Note: gen_server:call is synchronous, cast is asynchronous. Call is handLE_INFO and cast is handLE_cast.) \
\
1, the module (emqttd_client).
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
received(Bytes, State = #client_state{parser_fun = ParserFun,
packet_opts = PacketOpts,
proto_state = ProtoState}) ->
Copy the code
\
2, – the module (emqttd_parser).
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
Copy the code
\
3, – the module (emqttd_protocol).
Parsing PUBLISH messages
received(Packet = ? PACKET(_Type), State) -> process(Packet = ? PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) -> publish(Packet = ? PUBLISH_PACKET(? QOS_0, _PacketId), #proto_state{client_id = ClientId, username = Username, session = Session}) ->Copy the code
\
4, – the module (emqttd_session).
publish(_SessPid, Msg = #mqtt_message{qos = ? QOS_0}) ->Copy the code
\
5, – the module (emqttd).
publish(Msg) when is_record(Msg, mqtt_message) ->
Copy the code
\
6, – the module (emqttd_server).
publish(Msg = #mqtt_message{from = From}) ->
Copy the code
\
7, – the module (emqttd_pubsub).
Both local and cluster nodes are considered here
publish(Topic, Msg) -> lists:foreach( fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() -> ? MODULE:dispatch(To, Msg); (#mqtt_route{topic = To, node = Node}) -> rpc:cast(Node, ? MODULE, dispatch, [To, Msg]) end, emqttd_router:lookup(Topic)).Copy the code
\
dispatch(Topic, Msg) ->
SubPid ! {dispatch, Topic, Msg};
Copy the code
\
8, – the module (emqttd_session).
handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions})
when is_record(Msg, mqtt_message) ->
Copy the code
If the Client is online and offline, and CleanSession=0/QoS=1,2, the offline message needs to be stored. \
%% Queue message if client disconnected dispatch(Msg, Session = #session{client_pid = undefined, message_queue = Q}) -> hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}); %% Deliver qos0 message directly to client dispatch(Msg = #mqtt_message{qos = ? QOS0}, Session = #session{client_pid = ClientPid}) -> ClientPid ! {deliver, Msg}, hibernate(Session); dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ}) when QoS =:= ? QOS1 orelse QoS =:= ? QOS2 -> case check_inflight(Session) of true -> noreply(deliver(Msg, Session)); false -> hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) end.Copy the code
\
9, – the module (emqttd_client).
handle_info({deliver, Message}, State) ->
with_proto_state(fun(ProtoState) ->
emqttd_protocol:send(Message, ProtoState)
end, State);
Copy the code
\
10, – the module (emqttd_protocol).
send(Packet, State = #proto_state{sendfun = SendFun})
when is_record(Packet, mqtt_packet) ->
Copy the code
\