Learn the MQTT protocol and emQTTD open source project emqtt.com/
The source version of EMQTTD is V1.1.3. Emqtt.com/downloads/1…
\
The source code is written around, need to go through the following module call, entry is emQTTD_client module, handle_INFO function is responsible for receiving socket data:
Note: gen_server:call is synchronous, cast is asynchronous. Call is handLE_INFO and cast is handLE_cast.)
\
1, the module (emqttd_client).
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
received(Bytes, State = #client_state{parser_fun = ParserFun,
packet_opts = PacketOpts,
proto_state = ProtoState}) ->
Copy the code
\
2, – the module (emqttd_parser).
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
Copy the code
\
3, – the module (emqttd_protocol).
Parsing a SUBSCRIBE message
process(? SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) -> Client = client(State), AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Qos} <- TopicTable], case lists:member(deny, AllowDenies) of true -> ? LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State), send(? SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State); false -> emqttd_session:subscribe(Session, PacketId, TopicTable), {ok, State} end;Copy the code
\
4, – the module (emqttd_session). \
subscribe(SessPid, PacketId, TopicTable) ->
From = self(),
AckFun = fun(GrantedQos) ->
From ! {suback, PacketId, GrantedQos}
end,
gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
Copy the code
handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId,
subscriptions = Subscriptions}) ->
Copy the code
\
5, – the module (emqttd). \
subscribe(ClientId, Topic, Qos) ->
emqttd_server:subscribe(ClientId, Topic, Qos).
Copy the code
\
6, – the module (emqttd_server).
subscribe(ClientId, Topic, Qos) -> From = self(), call(server(From), {subscribe, From, ClientId, Topic, ? QOS_I(Qos)}). handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) -> pubsub_subscribe_(SubPid, Topic), if_subsciption(State, fun() -> add_subscription_(ClientId, Topic, Qos), set_subscription_stats() end), pubsub_subscribe_(SubPid, Topic) -> case ets:match(subscribed, {SubPid, Topic}) of [] -> emqttd_pubsub:async_subscribe(Topic, SubPid), ets:insert(subscribed, {SubPid, Topic}); [_] -> false end. add_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) -> mnesia:dirty_write(subscription, Subscription).Copy the code
\
7, – the module (emqttd_pubsub).
Stores the subscribed topics and node names in the Mnesia database, which takes account of clustering;
Subscription topics and process ids are also stored in ETS tables.
async_subscribe(Topic, SubPid) when is_binary(Topic) ->
cast(pick(Topic), {subscribe, Topic, SubPid}).
handle_cast({subscribe, Topic, SubPid}, State) ->
add_subscriber_(Topic, SubPid),
{noreply, setstats(State)};
add_subscriber_(Topic, SubPid) ->
case ets:member(subscriber, Topic) of
false ->
mnesia:transaction(fun add_topic_route_/2, [Topic, node()]),
setstats(topic);
true ->
ok
end,
ets:insert(subscriber, {Topic, SubPid}).
Copy the code
add_topic_route_(Topic, Node) ->
add_topic_(Topic), emqttd_router:add_route(Topic, Node).
add_topic_(Topic) ->
add_topic_(Topic, []).
add_topic_(Topic, Flags) ->
Record = #mqtt_topic{topic = Topic, flags = Flags},
case mnesia:wread({topic, Topic}) of
[] -> mnesia:write(topic, Record, write);
[_] -> ok
end.
Copy the code
8, – the module (emqttd_router). \
add_route(Topic, Node) when is_binary(Topic), is_atom(Node) ->
add_route(#mqtt_route{topic = Topic, node = Node}).
add_route_(Route = #mqtt_route{topic = Topic}) ->
case mnesia:wread({route, Topic}) of
[] ->
case emqttd_topic:wildcard(Topic) of
true -> emqttd_trie:insert(Topic);
false -> ok
end,
mnesia:write(route, Route, write);
Records ->
case lists:member(Route, Records) of
true -> ok;
false -> mnesia:write(route, Route, write)
end
end.
Copy the code
\
\
\