First, work environment preparation
The Erlang Kafka client library uses github.com/helpshift/e…
Emq uses v2.3.5, github.com/emqtt/emq-r…
Kafka’s running environment, blog.csdn.net/libaineu200…
We are in the form of a plug-in, my plug-in path is/home/firecat/Prj/emq2.0 / emq – relx – 2.3.5 / deps
Copy a copy of emq_plugin_template and rename it to emq_plugin_kafka_ekaf. Note that the relevant configuration files and source files should be renamed.
Enter the directory/home/firecat/Prj/emq2.0 / emq – relx – 2.3.5 / deps/emq_plugin_kafka_ekaf \
Makefile file with new red text
BUILD_DEPS = emqttd cuttlefish ekaf dep_ekaf = git github.com/helpshift/e… master\
\
Ii. Modification of project documents
1, enter the directory/home/firecat/Prj/emq2.0 / emq – relx – 2.3.5 / Makefile \
The Makefile to increase
DEPS += emq_plugin_kafka_ekaf
Add three lines to relx.config release:
kafkamocker,
ekaf,
{emq_plugin_kafka_ekaf, load}\
3, / home/firecat/Prj/emq2.0 / emq relx – 2.3.5 / data/loaded_plugins set to start the plugin
emq_recon.
emq_modules.
emq_retainer.
emq_dashboard.
emq_plugin_kafka_ekaf.\
\
Three, source code implementation
1, / home/firecat/Prj/emq2.0 / emq relx – 2.3.5 deps/emq_plugin_kafka_ekaf/etc/emq_plugin_kafka_ekaf config
[{emq_plugin_kafka_ekaf, [{kafka, [{bootstrap_broker, {"127.0.0.1", 9092}}, %% cannot use "localhost" {partition_strategy, strict_round_robin} ]} ]} ].Copy the code
\
2, / home/firecat/Prj/emq2.0 / emq – relx – 2.3.5 / deps/emq_plugin_kafka_ekaf/SRC/emq_plugin_kafka_ekaf erl
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emq_plugin_kafka_ekaf).
-include_lib("emqttd/include/emqttd.hrl").
-define(TEST_TOPIC,<<"emq_broker_message">>).
-export([load/1, unload/0]).
%% Hooks functions
-export([on_client_connected/3, on_client_disconnected/3]).
-export([on_client_subscribe/4, on_client_unsubscribe/4]).
-export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).
-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).
%% Called when the plugin application start
load(Env) ->
ekaf_init([Env]),
emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),
emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).
on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->
io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
Json = mochijson2:encode([
{type, <<"connected">>},
{client_id, ClientId},
{cluster_node, node()},
{ts, emqttd_time:now_ms()}
]),
%%ekaf:produce_async_batched(?TEST_TOPIC, list_to_binary(Json)),批量发送,不建议使用,我发现kafka接收会滞后
ekaf:produce_async(?TEST_TOPIC, list_to_binary(Json)),
ekaf:produce_async(?TEST_TOPIC, {<<"mykey_1">>, list_to_binary(Json)}),
{ok, Client}.
on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
Json = mochijson2:encode([
{type, <<"disconnected">>},
{client_id, ClientId},
{reason, Reason},
{cluster_node, node()},
{ts, emqttd_time:now_ms()}
]),
%%ekaf:produce_async_batched(?TEST_TOPIC, list_to_binary(Json)),批量发送,不建议使用,我发现kafka接收会滞后
ekaf:produce_async(?TEST_TOPIC, list_to_binary(Json)),
ekaf:produce_async(?TEST_TOPIC, {<<"mykey_2">>, list_to_binary(Json)}),
ok.
on_client_subscribe(ClientId, Username, TopicTable, _Env) ->
io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),
{ok, TopicTable}.
on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
{ok, TopicTable}.
on_session_created(ClientId, Username, _Env) ->
io:format("session(~s/~s) created.", [ClientId, Username]).
on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
{ok, {Topic, Opts}}.
on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->
io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
ok.
on_session_terminated(ClientId, Username, Reason, _Env) ->
io:format("session(~s/~s) terminated: ~p.", [ClientId, Username, Reason]).
%% transform message and return
on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
{ok, Message};
on_message_publish(Message, _Env) ->
io:format("publish ~s~n", [emqttd_message:format(Message)]),
Id = Message#mqtt_message.id,
From = Message#mqtt_message.from, %需要登录和不需要登录这里的返回值是不一样的
Topic = Message#mqtt_message.topic,
Payload = Message#mqtt_message.payload,
Qos = Message#mqtt_message.qos,
Dup = Message#mqtt_message.dup,
Retain = Message#mqtt_message.retain,
Timestamp = Message#mqtt_message.timestamp,
ClientId = c(From),
Username = u(From),
Json = mochijson2:encode([
{type, <<"publish">>},
{client_id, ClientId},
{message, [
{username, Username},
{topic, Topic},
{payload, Payload},
{qos, i(Qos)},
{dup, i(Dup)},
{retain, i(Retain)}
]},
{cluster_node, node()},
{ts, emqttd_time:now_ms()}
]),
%%ekaf:produce_async_batched(?TEST_TOPIC, list_to_binary(Json)),批量发送,不建议使用,我发现kafka接收会滞后
ekaf:produce_async(?TEST_TOPIC, list_to_binary(Json)),
{ok, Message}.
on_message_delivered(ClientId, Username, Message, _Env) ->
io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
{ok, Message}.
on_message_acked(ClientId, Username, Message, _Env) ->
io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
{ok, Message}.
%% Called when the plugin application stop
unload() ->
emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),
emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),
emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).
%% ===================================================================
%% ekaf_init https://github.com/helpshift/ekaf
%% ===================================================================
ekaf_init(_Env) ->
%% Start
application:load(ekaf),
%% Get parameters
{ok, Kafka} = application:get_env(?MODULE, kafka),
BootstrapBroker = proplists:get_value(bootstrap_broker, Kafka),
PartitionStrategy= proplists:get_value(partition_strategy, Kafka),
%% Set partition strategy, like application:set_env(ekaf, ekaf_partition_strategy, strict_round_robin),
application:set_env(ekaf, ekaf_partition_strategy, PartitionStrategy),
%% Set broker url and port, like application:set_env(ekaf, ekaf_bootstrap_broker, {"127.0.0.1", 9092}),
application:set_env(ekaf, ekaf_bootstrap_broker, BootstrapBroker),
%% Set topic
application:set_env(ekaf, ekaf_bootstrap_topics, ?TEST_TOPIC),
{ok, _} = application:ensure_all_started(kafkamocker),
{ok, _} = application:ensure_all_started(gproc),
{ok, _} = application:ensure_all_started(ekaf),
io:format("Init ekaf with ~p~n", [BootstrapBroker]).
i(true) -> 1;
i(false) -> 0;
i(I) when is_integer(I) -> I.
c({ClientId, Username}) -> ClientId;
c(From) -> From.
u({ClientId, Username}) -> Username;
u(From) -> From.
Copy the code
\
Complete source code download address: download.csdn.net/download/li…
\
—
Refer to the article: www.cnblogs.com/wunaozai/p/…
Github.com/msdevanms/e… \
\