Learn the MQTT protocol and emQTTD open source project emqtt.com/

The source version of EMQTTD is V1.1.3. Emqtt.com/downloads/1…

\

First, look at the documentation of EMQ: emqtt.com/docs/v1/clu…

Emqttd Cluster Settings management¶

IO and s1.emqtt. IO are deployed in clusters:

The node name The host name (FQDN) The IP address
[email protected] 或[email protected] s1.emqtt.io 192.168.0.10
[email protected] 或[email protected] s2.emqtt.io 192.168.0.20

Warning

The node name format is Name@Host. Host must be an IP address or FQDN(Host name). The domain name)

[email protected] Node Settings

emqttd/etc/vm.args:

-name [email protected] or -name [email protected]Copy the code

Warning

After a node is added to the cluster, the node name cannot be changed.

[email protected] Node Settings

emqttd/etc/vm.args:

-name [email protected] or -name [email protected]Copy the code

Adding a Node to a Cluster

After the two nodes are started, run the following command on [email protected]:

$ ./bin/emqttd_ctl cluster join [email protected]

Join the cluster successfully.
Cluster status: [{running_nodes,['[email protected]','[email protected]']}]
Copy the code

Or, at [email protected]:

$ ./bin/emqttd_ctl cluster join [email protected]

Join the cluster successfully.
Cluster status: [{running_nodes,['[email protected]','[email protected]']}]
Copy the code

Querying cluster status on any node:

$ ./bin/emqttd_ctl cluster status

Cluster status: [{running_nodes,['[email protected]','[email protected]']}]
Copy the code

Exiting a node from a cluster

A node can exit the cluster in either of the following ways:

  1. Leave: the node exits the cluster
  2. Remove: Deletes other nodes from the cluster

[email protected] Exit the cluster:

$ ./bin/emqttd_ctl cluster leave
Copy the code

Or [email protected], delete [email protected] from the cluster:

$ ./bin/emqttd_ctl cluster remove [email protected]
Copy the code

How to use emqttd_ctl?

-module(emqttd_cli). Defines the command to load \

-export([status/1, broker/1, cluster/1, users/1, clients/1, sessions/1,
         routes/1, topics/1, subscriptions/1, plugins/1, bridges/1,
         listeners/1, vm/1, mnesia/1, trace/1]).
load() ->
    Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
    [emqttd_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) || Cmd <- Cmds].
Copy the code

\

What happens to the Mnesia database after joining the cluster?

The Mnesia database naturally supports distributed clustering. After the child nodes join, mnesia and its child nodes will be in sync, similar to a MySQL database master-slave backup. Look at the source: \

-module(emqttd_mnesia).\

%% @doc Join the mnesia cluster
-spec(join_cluster(node()) -> ok).
join_cluster(Node) when Node =/= node() ->
    %% Stop mnesia and delete schema first
    ensure_ok(ensure_stopped()),
    ensure_ok(delete_schema()),
    %% Start mnesia and cluster to node
    ensure_ok(ensure_started()),
    ensure_ok(connect(Node)),
    ensure_ok(copy_schema(node())),
    %% Copy tables
    copy_tables(),
    ensure_ok(wait_for(tables)).
Copy the code

When a child node joins, it will delete its own Mnesia database and its tables, then copy a copy of the host node’s library and copy the tables.

%% @doc Cluster with node.
-spec(connect(node()) -> ok | {error, any()}).
connect(Node) ->
    case mnesia:change_config(extra_db_nodes, [Node]) of
        {ok, [Node]} -> ok;
        {ok, []}     -> {error, {failed_to_connect_node, Node}};
        Error        -> Error
    end.

%% @doc Copy schema.
copy_schema(Node) ->
    case mnesia:change_table_copy_type(schema, Node, disc_copies) of
        {atomic, ok} ->
            ok;
        {aborted, {already_exists, schema, Node, disc_copies}} ->
            ok;
        {aborted, Error} ->
            {error, Error}
    end.

%% @doc Copy mnesia tables.
copy_tables() ->
    emqttd_boot:apply_module_attributes(copy_mnesia).
Copy the code

\

Function copy_tables() will retrieve and execute the mnesia(copy) function on all ERL modules in the EMQ project directory. The module requires the keyword “-copy_mnesia({mnesia, [copy]}).” \

Such as:

-module(emqttd_backend).\

mnesia(copy) ->
    ok = emqttd_mnesia:copy_table(retained_message),
    ok = emqttd_mnesia:copy_table(backend_subscription).
Copy the code

\

-module(emqttd_router).\

-copy_mnesia({mnesia, [copy]}).

mnesia(copy) ->
    ok = emqttd_mnesia:copy_table(route, ram_copies).
Copy the code

\

The EMQ project directory has the keyword -boot_mnesia({mnesia, [boot]}). And – copy_mnesia ({mnesia, [copy]}). The module is:

-module(emqttd_backend).

-module(emqttd_pubsub).

-module(emqttd_router).

-module(emqttd_server).

-module(emqttd_sm).

-module(emqttd_trie).\

Databases retained_message and backend_subscription are of the disc_copies type, and other modules are of the ram_copies type.

\

Emqttd initializes Mnesia

1, the module (emqttd_app).

start(_StartType, _StartArgs) ->
    print_banner(),
    emqttd_mnesia:start(),
Copy the code

2, – the module (emqttd_mnesia).

start() ->
    ensure_ok(ensure_data_dir()),
    ensure_ok(init_schema()),
    ok = mnesia:start(),
    init_tables(),
    wait_for(tables).

%% @doc Init mnesia schema or tables.
init_schema() ->
    case mnesia:system_info(extra_db_nodes) of
        []    -> mnesia:create_schema([node()]);
        [_|_] -> ok
    end.

%% @private
%% @doc Init mnesia tables.
init_tables() ->
    case mnesia:system_info(extra_db_nodes) of
        []    -> create_tables();
        [_|_] -> copy_tables()
    end.
Copy the code

Example for database startup and copying -module(emqttd_backend).

-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).

%% Mnesia callbacks
%%--------------------------------------------------------------------

mnesia(boot) ->
    ok = emqttd_mnesia:create_table(retained_message, [
                {type, ordered_set},
                {disc_copies, [node()]},
                {record_name, retained_message},
                {attributes, record_info(fields, retained_message)},
                {storage_properties, [{ets, [compressed]},
                                      {dets, [{auto_save, 1000}]}]}]),
    ok = emqttd_mnesia:create_table(backend_subscription, [
                {type, bag},
                {disc_copies, [node()]},
                {record_name, mqtt_subscription},
                {attributes, record_info(fields, mqtt_subscription)},
                {storage_properties, [{ets, [compressed]},
                                      {dets, [{auto_save, 5000}]}]}]);

mnesia(copy) ->
    ok = emqttd_mnesia:copy_table(retained_message),
    ok = emqttd_mnesia:copy_table(backend_subscription).
Copy the code

-module(emqttd_router)

-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).

mnesia(boot) ->
    ok = emqttd_mnesia:create_table(route, [
                {type, bag},
                {ram_copies, [node()]},
                {record_name, mqtt_route},
                {attributes, record_info(fields, mqtt_route)}]);

mnesia(copy) ->
    ok = emqttd_mnesia:copy_table(route, ram_copies).
Copy the code

\

Five, matters needing attention

1. If the IP address of the EMQ server is 192.168.0.10

The node name A: [email protected] is the same as the node name B: [email protected]. If EMQ starts the server on node A, then it will fail to start the server on node B. \

You have to change the name of either A or B. That is, the node Name format: Name@Host in the Name to be distinguished.

2, the cluster information is recorded in the project directory, / rel/emqttd/data/mnesia/[email protected] / schema. DAT

That is, when child node A connects to primary node B, the cluster information is recorded in schema.dat. If child node A does not disconnect the cluster, it still connects to primary node B during the next restart.

★ There are several remaining issues to be confirmed, I do not know whether the EMQ V2 version has been fixed:

Problem (1) If child node A does not disconnect the cluster, the next restart fails if child node B does not exist. How terrible!

Problem (2) After A connects to B. A directory file/rel/emqttd/data/mnesia/[email protected] / retained_message DCD and backend_subscription DCD is deleted. I won’t see it again. It’s gone. Strange! Note that the two Mnesia database table types are persistent and disc_copies.

★ Measured EMQ2.3.7, A is dominant and B is subordinate, the conclusions are as follows:

(1) After B joins A, B will actively delete B’s Mnesia list and copy it from A. B will also delete B’s Mnesia table after B leaves A.

(2) After B joins A, A and B’s Mnesia tables will always be the same.

Add, delete, or update A’s table data, B will synchronize.

Add, delete, or update B’s table data, A will synchronize. \

(3) of the cluster information is recorded in the project directory, / rel/emqttd/data/mnesia/[email protected] / schema. The DAT. After process A or B exits, the cluster status remains when the process is restarted.

3. Common commands

/emqttd console./emqttd start. /emqttd stop. /emqttd_ctl cluster join [email protected]./emqttd_ctl cluster status\

./emqttd_ctl cluster leave

Werl-name [email protected] -setcookie Emqsecretcookie Observer :start().. /_rel/emqttd/bin/emqttd console ./_rel/emqttd/bin/emqttd start ./_rel/emqttd/bin/emqttd_ctl status ./_rel/emqttd/bin/emqttd stop /_rel/emqttd/bin/emqttd_ctl cluster join [email protected]./_rel/emqttd/bin/emqttd_ctl cluster status ./_rel/emqttd/bin/emqttd_ctl cluster leave\