First, work environment preparation

The Erlang Kafka client library uses brod, github.com/klarna/brod

Emq uses v2.3.5, github.com/emqtt/emq-r…

Kafka’s running environment, blog.csdn.net/libaineu200…

We are in the form of a plug-in, my plug-in path is/home/firecat/Prj/emq2.0 / emq – relx – 2.3.5 / deps

Copy the emq_plugin_template and rename it to emq_plugin_kafka_brod. Note that the related configuration files and source files should be renamed.

Enter the directory/home/firecat/Prj/emq2.0 / emq – relx – 2.3.5 / deps/emq_plugin_kafka_brod \

Makefile file with new red text

BUILD_DEPS = emqttd cuttlefish brod dep_brod = git github.com/klarna/brod… 3.4.0 \

\

Ii. Modification of project documents

1, enter the directory/home/firecat/Prj/emq2.0 / emq – relx – 2.3.5 / Makefile \

The Makefile to increase

DEPS += emq_plugin_kafka_brod

Relx. config add the following lines to the release paragraph:

 brod,

{emq_plugin_kafka_brod, load}

\

3, / home/firecat/Prj/emq2.0 / emq relx – 2.3.5 / data/loaded_plugins set to start the plugin

emq_recon.

emq_modules.

emq_retainer.

emq_dashboard.

emq_plugin_kafka_brod.\

\

4, compile process error:

DEP brod to make [2] : if the directory ` / home/firecat/Prj/emq2.0 / emq - relx - 2.3.5 / deps/brod '.. /.. / Erlang. Mk: 1260: warning: overriding the recipe for target ` / home/firecat/Prj/emq2.0 / emq - relx - 2.3.5 / deps/docopt '.. /.. / Erlang. Mk: 1235: warning: ignoring old recipe for target ` / home/firecat/Prj/emq2.0 / emq - relx - 2.3.5 / deps/docopt '.. /.. / Erlang. Mk: 1260: warning: overriding the recipe for target ` / home/firecat/Prj/emq2.0 / emq - relx - 2.3.5 / deps/jsone '.. /.. /erlang.mk:1235: warning: Ignoring old recipe for target ` / home/firecat/Prj/emq2.0 / emq - relx - 2.3.5 / deps/jsone 'DEP supervisor3 Error: Unknown or invalid dependency: supervisor3. make[2]: * * * [/ home/firecat/Prj/emq2.0 / emq - relx - 2.3.5 / deps/supervisor3] Error 78 make [2]. Brigade directory ` / home/firecat/Prj/emq2.0 / emq - relx - 2.3.5 / deps/brod to 'make [1] : * * * (deps) Error 2 make [1] : Brigade directory ` / home/firecat/Prj/emq2.0 / emq - relx - 2.3.5 / deps/emq_plugin_kafka_brod 'make: * * * (deps) Error 2Copy the code

Solutions are as follows:

/ home/firecat/Prj/emq2.0 / emq – relx – 2.3.5 / deps/brod \

For the Makefile, lines 12 and 13 dep_supervisor3_commit = 1.1.5 dep_kafka_PROTOCOL_commit = 1.1.2 Dep_supervisor3 = git github.com/klarna/supe… 1.1.5 \

Dep_kafka_protocol = git github.com/klarna/kafk… 1.1.2

\

Three, the source code (the full source code download address: download.csdn.net/download/li…).

1, / home/firecat/Prj/emq2.0 / emq relx – 2.3.5 deps/emq_plugin_kafka_ekaf/etc/emq_plugin_kafka_brod config

[{emq_plugin_kafka_brod, [{kafka, [{bootstrap_broker, / {" 127.0.0.1 ", 9092}}, %% can use "localhost" {query_API_versions, false}, {reCONNECt_COOL_down_seconds, 10}]}].Copy the code

For Kafka Broker clustering, it is not recommended to use localhost and 127.0.0.1. Instead, use a real IP. You can have multiple IP addresses, for example:

[{172.16.6.170, 9092}, {” 172.16.6.170 “, 9093}, {” 172.16.6.170 “, 9094}] \

\

2, / home/firecat/Prj/emq2.0 / emq – relx – 2.3.5 / deps/emq_plugin_kafka_ekaf/SRC/emq_plugin_kafka_brod erl \

%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emq_plugin_kafka_brod).

-include_lib("emqttd/include/emqttd.hrl").

-include_lib("brod/include/brod_int.hrl").

-define(TEST_TOPIC, <<"emqtest">>).

-export([load/1, unload/0]).

%% Hooks functions

-export([on_client_connected/3, on_client_disconnected/3]).

-export([on_client_subscribe/4, on_client_unsubscribe/4]).

-export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).

-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).

%% Called when the plugin application start
load(Env) ->
    brod_init([Env]),
    emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
    emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
    emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
    emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
    emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
    emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
    emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
    emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),
    emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
    emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
    emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).

on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->
    io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
    Json = mochijson2:encode([
        {type, <<"connected">>},
        {client_id, ClientId},
        {cluster_node, node()},
        {ts, emqttd_time:now_ms()}
    ]),
    
    %%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_1">>, list_to_binary(Json)),
    {ok, CallRef} = brod:produce(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_1">>, list_to_binary(Json)),
    receive
        #brod_produce_reply{ call_ref = CallRef
                           , result   = brod_produce_req_acked
                       } ->
        io:format("brod_produce_reply:ok ~n"),
        ok
    after 5000 ->
        io:format("brod_produce_reply:exit ~n"),
        erlang:exit(timeout)
        %%ct:fail({?MODULE, ?LINE, timeout})
    end,

    {ok, Client}.

on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
    io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
    Json = mochijson2:encode([
        {type, <<"disconnected">>},
        {client_id, ClientId},
        {reason, Reason},
        {cluster_node, node()},
        {ts, emqttd_time:now_ms()}
    ]),

    %%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_2">>, list_to_binary(Json)),
    {ok, CallRef} = brod:produce(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_2">>, list_to_binary(Json)),
    receive
        #brod_produce_reply{ call_ref = CallRef
                           , result   = brod_produce_req_acked
                       } ->
        ok
    after 5000 ->
        ct:fail({?MODULE, ?LINE, timeout})
    end,

    ok.

on_client_subscribe(ClientId, Username, TopicTable, _Env) ->
    io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),
    {ok, TopicTable}.
    
on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
    io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
    {ok, TopicTable}.

on_session_created(ClientId, Username, _Env) ->
    io:format("session(~s/~s) created.", [ClientId, Username]).

on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
    io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
    {ok, {Topic, Opts}}.

on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->
    io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
    ok.

on_session_terminated(ClientId, Username, Reason, _Env) ->
    io:format("session(~s/~s) terminated: ~p.", [ClientId, Username, Reason]).

%% transform message and return
on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
    {ok, Message};

on_message_publish(Message, _Env) ->
    io:format("publish ~s~n", [emqttd_message:format(Message)]),
    
    Id = Message#mqtt_message.id,
    From = Message#mqtt_message.from, %需要登录和不需要登录这里的返回值是不一样的
    Topic = Message#mqtt_message.topic,
    Payload = Message#mqtt_message.payload,
    Qos = Message#mqtt_message.qos,
    Dup = Message#mqtt_message.dup,
    Retain = Message#mqtt_message.retain,
    Timestamp = Message#mqtt_message.timestamp,

    ClientId = c(From),
    Username = u(From),

    Json = mochijson2:encode([
			      {type, <<"publish">>},
			      {client_id, ClientId},
			      {message, [
					 {username, Username},
					 {topic, Topic},
					 {payload, Payload},
					 {qos, i(Qos)},
					 {dup, i(Dup)},
					 {retain, i(Retain)}
					]},
			      {cluster_node, node()},
			      {ts, emqttd_time:now_ms()}
			     ]),

    %%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_3">>, list_to_binary(Json)),
    {ok, CallRef} = brod:produce(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_3">>, list_to_binary(Json)),
    receive
        #brod_produce_reply{ call_ref = CallRef
                           , result   = brod_produce_req_acked
                       } ->
        ok
    after 5000 ->
        ct:fail({?MODULE, ?LINE, timeout})
    end,

    {ok, Message}.

on_message_delivered(ClientId, Username, Message, _Env) ->
    io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
    {ok, Message}.

on_message_acked(ClientId, Username, Message, _Env) ->
    io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
    {ok, Message}.

%% Called when the plugin application stop
unload() ->
    %%application:stop(brod),
    emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
    emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
    emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
    emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
    emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),
    emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
    emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
    emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),
    emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
    emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
    emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).

%% ===================================================================
%% brod_init https://github.com/klarna/brod
%% ===================================================================
brod_init(_Env) ->
    {ok, _} = application:ensure_all_started(brod),
    {ok, Kafka} = application:get_env(?MODULE, kafka),
    KafkaBootstrapEndpoints = proplists:get_value(bootstrap_broker, Kafka),
    %%KafkaBootstrapEndpoints = [{"127.0.0.1", 9092}], %%localhost,172.16.6.161
    %%KafkaBootstrapEndpoints = [{"localhost", 9092}], %%localhost,172.16.6.161
    %%ClientConfig = [{reconnect_cool_down_seconds, 10}],%% socket error recovery
    ClientConfig = [],%% socket error recovery
    Topic = ?TEST_TOPIC,
    Partition = 0,
    ok = brod:start_client(KafkaBootstrapEndpoints, brod_client_1, ClientConfig),
    ok = brod:start_producer(brod_client_1, Topic, _ProducerConfig = []),
    %%ok = brod:produce_sync(brod_client_1, Topic, Partition, <<"key1">>, <<"value1">>),
    %%{ok, CallRef} = brod:produce(brod_client_1, Topic, Partition, <<"key1">>, <<"value2">>),
    io:format("Init ekaf with ~p~n", [KafkaBootstrapEndpoints]).

i(true) -> 1;
i(false) -> 0;
i(I) when is_integer(I) -> I.
c({ClientId, Username}) -> ClientId;
c(From) -> From.
u({ClientId, Username}) -> Username;
u(From) -> From.
Copy the code

Please note:

(1) Before running brod:start_producer, ensure that the Topic in Kafka has been set up, otherwise running brod:start_producer will throw an exception.

Topic = ? TEST_TOPIC,\

ok = brod:start_producer(brod_client_1, Topic, _ProducerConfig = []),

(2) For brod:start_client, clientid can also be used like this:

-define(CLIENT_ID, ? MODULE). ClientId = ? CLIENT_ID,\

BootstrapHosts = [{“localhost”, 9092}],

BootstrapHosts = [{” 172.16.6.170 “, 9092}, {” 172.16.6.170 “, 9093}, {” 172.16.6.170 “, 9094}], version of \ % % clusters

ClientConfig   = [],

ok = brod:start_client(BootstrapHosts, ClientId, ClientConfig),\

\

Kafka theme creation

Pay attention to,

(1) When Kafka Broker and ZooKeeper use a cluster, you must manually create a topic and specify the zooKeeper node list. Standalone should also be created. Otherwise, the client will report an error.

(2) Do not use “.” or “_” characters in the subject name.

(3) Create a theme

The. / bin/kafka – switchable viewer. Sh — — create — zookeeper 172.16.6.170:2181172.16. 6.170:2182172.16. 6.170:2183 – replication – factor 3 –partitions 3 –topic emqtest

(4) Consumers

./bin/kafka-console-consumer.sh –zookeeper 172.16.6.170:2181 –topic emqtest –from-beginning

\

Partition = 0; Partition = 0; Since the Erlang Brod library does not implement automatic partitioning, we need to compute the hash value manually.

-define(NUM_PARTITIONS, 3). -define(EMPTY(S), (S == <<"">> orelse S == undefined)). getPartition(ClientId) when ? EMPTY(ClientId) -> crypto:rand_uniform(0, ? NUM_PARTITIONS); getPartition(ClientId) -> <<NodeD01, NodeD02, NodeD03, NodeD04, NodeD05, NodeD06, NodeD07, NodeD08, NodeD09, NodeD10, NodeD11, NodeD12, NodeD13, NodeD14, NodeD15, NodeD16>> = Val = crypto:hash(md5, IO :format("Value is ~w~n", [Val]), NodeD16 rem? %% for example: ClientId = <<"123456">>, Partition = getPartition(ClientId), brod:produce(? KAFKA_CLIENT1, ? EMQ_TOPIC_ONLINE, Partition, ClientId, list_to_binary(Json)).Copy the code

My solution (take the MD5 value of clientId, and then take the remainder) is a simple custom version, not the native Java HashCode source code implementation. To view the source of the java.lang.String hashCode function natively.

\

Here is the official partition implementation for Kafka:

Def partition(key: Any, numPartitions: Int) by default, Kafka hash(key) % numPartitions based on the key that sent the message, as shown in the following figure: Int = {utils.abs (key.hashcode) % numPartitions} this ensures that messages with the same key will be routed to the same partition. If you don’t specify a key, how does Kafka determine which partition the message goes to? If (key = = null) {/ / if you don’t specify the key val id = sendPartitionPerTopicCache. Get (topic) / / look at Kafka have ready-made partition of cache id id match {case Some(partitionId) => partitionId Val availablePartitions = topicPartitionList. Filter (_. LeaderBrokerIdOpt. IsDefined) / / find the broker if the leader of all the available partitions (availablePartitions.isEmpty) throw new LeaderNotAvailableException(“No leader for any partition in topic ” + topic) val Index = utils.abs (random.nextint) % availablePartitions. Size availablePartitions(index).partitionId sendPartitionPerTopicCache.put(topic, PartitionId) // Update the cache for the next direct use of partitionId}} as you can see, Kafka almost randomly selects a partition to send a keyless message and adds the partition number to the cache for direct use later — of course, Kafka itself clears this cache (by default every 10 minutes or every time topic metadata is requested) \

\

Complete source code download address: download.csdn.net/download/li… \