Starting with v4.1, the EMQ X MQTT server provides a dedicated multilingual support plug-in emqX_extension_hook that now supports the use of other programming languages to handle hook events in EMQ X, Developers can use Python or Java to quickly develop their own plug-ins, which can be extended based on official functions to meet their own business scenarios. Such as:
- Verify the login permission of a client: When the client is connected, the corresponding function is triggered. After obtaining the client information based on the parameters, the client can read the database and compare the parameters to determine whether the client has the login permission
- Record the client online status and online and offline history: When the client status changes, the corresponding function is triggered to obtain the client information and rewrite the client online status in the database
- Verify the operation permission of PUB/SUB of a client: When publishing or subscribing, the corresponding function is triggered to obtain the client information and the current topic and determine whether the client has the corresponding operation permission
- Process Sessions and Message events, realize subscription relationship and Message processing/storage: Trigger corresponding functions when messages are published and status changes, obtain the current client information, Message status and Message content, and forward them to Kafka or database for storage.
Note: Message class hooks are supported only in the Enterprise edition.
Python and Java drivers are based on Erlang/ OTP-port interprocess communication, which has very high throughput performance. This paper takes Java extension as an example to introduce the use of EMQ X cross-language extension.
Java extension usage examples
requirements
- JDK 1.8 or later must be installed on the EMQ X server
Begin to use
- Creating a Java project
- Download the IO. Emqx.extension. jar and erlport.jar files
- Add the SDK
io.emqx.extension.jar
anderlport.jar
To project dependencies - copy
examples/SampleHandler.java
Into your project - According to the SDK
SampleHandler.java
Write business code to ensure successful compilation
The deployment of
After compiling all the source code, you need to deploy the SDK and code files into EMQ X:
- copy
io.emqx.extension.jar
到emqx/data/extension
directory - Will be compiled
.class
Files, for exampleSampleHandler.class
Copied to theemqx/data/extension
directory - Modify the
emqx/etc/plugins/emqx_extension_hook.conf
Configuration file:
exhook.drivers = java
## Search path for scripts or library
exhook.drivers.java.path = data/extension/
exhook.drivers.java.init_module = SampleHandler
Copy the code
Start the emqx_extension_hook plug-in and it will not start properly if the configuration is wrong or the Java code is written incorrectly. After startup, try to establish an MQTT connection and observe the business.
The sample
Here is SampleHandler. Java sample program, the program the DefaultCommunicationHandler class inherits from the SDK. This sample code demonstrates how to mount all the hooks in an EMQ X system:
import emqx.extension.java.handler.*;
import emqx.extension.java.handler.codec.*;
import emqx.extension.java.handler.ActionOptionConfig.Keys;
public class SampleHandler extends DefaultCommunicationHandler {
@Override
public ActionOptionConfig getActionOption(a) {
ActionOptionConfig option = new ActionOptionConfig();
option.set(Keys.MESSAGE_PUBLISH_TOPICS, "#");
option.set(Keys.MESSAGE_DELIVERED_TOPICS, "#");
option.set(Keys.MESSAGE_ACKED_TOPICS, "#");
option.set(Keys.MESSAGE_DROPPED_TOPICS, "#");
return option;
}
// Clients
@Override
public void onClientConnect(ConnInfo connInfo, Property[] props) {
System.err.printf("[Java] onClientConnect: connInfo: %s, props: %s\n", connInfo, props);
}
@Override
public void onClientConnack(ConnInfo connInfo, ReturnCode rc, Property[] props) {
System.err.printf("[Java] onClientConnack: connInfo: %s, rc: %s, props: %s\n", connInfo, rc, props);
}
@Override
public void onClientConnected(ClientInfo clientInfo) {
System.err.printf("[Java] onClientConnected: clientinfo: %s\n", clientInfo);
}
@Override
public void onClientDisconnected(ClientInfo clientInfo, Reason reason) {
System.err.printf("[Java] onClientDisconnected: clientinfo: %s, reason: %s\n", clientInfo, reason);
}
// Determine the authentication result, return true or false
@Override
public boolean onClientAuthenticate(ClientInfo clientInfo, boolean authresult) {
System.err.printf("[Java] onClientAuthenticate: clientinfo: %s, authresult: %s\n", clientInfo, authresult);
return true;
}
// Determine the result of ACL check, return true or false
@Override
public boolean onClientCheckAcl(ClientInfo clientInfo, PubSub pubsub, Topic topic, boolean result) {
System.err.printf("[Java] onClientCheckAcl: clientinfo: %s, pubsub: %s, topic: %s, result: %s\n", clientInfo, pubsub, topic, result);
return true;
}
@Override
public void onClientSubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) {
System.err.printf("[Java] onClientSubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props);
}
@Override
public void onClientUnsubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) {
System.err.printf("[Java] onClientUnsubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props);
}
// Sessions
@Override
public void onSessionCreated(ClientInfo clientInfo) {
System.err.printf("[Java] onSessionCreated: clientinfo: %s\n", clientInfo);
}
@Override
public void onSessionSubscribed(ClientInfo clientInfo, Topic topic, SubscribeOption opts) {
System.err.printf("[Java] onSessionSubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic);
}
@Override
public void onSessionUnsubscribed(ClientInfo clientInfo, Topic topic) {
System.err.printf("[Java] onSessionUnsubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic);
}
@Override
public void onSessionResumed(ClientInfo clientInfo) {
System.err.printf("[Java] onSessionResumed: clientinfo: %s\n", clientInfo);
}
@Override
public void onSessionDiscarded(ClientInfo clientInfo) {
System.err.printf("[Java] onSessionDiscarded: clientinfo: %s\n", clientInfo);
}
@Override
public void onSessionTakeovered(ClientInfo clientInfo) {
System.err.printf("[Java] onSessionTakeovered: clientinfo: %s\n", clientInfo);
}
@Override
public void onSessionTerminated(ClientInfo clientInfo, Reason reason) {
System.err.printf("[Java] onSessionTerminated: clientinfo: %s, reason: %s\n", clientInfo, reason);
}
// Messages
@Override
public Message onMessagePublish(Message message) {
System.err.printf("[Java] onMessagePublish: message: %s\n", message);
return message;
}
@Override
public void onMessageDropped(Message message, Reason reason) {
System.err.printf("[Java] onMessageDropped: message: %s, reason: %s\n", message, reason);
}
@Override
public void onMessageDelivered(ClientInfo clientInfo, Message message) {
System.err.printf("[Java] onMessageDelivered: clientinfo: %s, message: %s\n", clientInfo, message);
}
@Override
public void onMessageAcked(ClientInfo clientInfo, Message message) {
System.err.printf("[Java] onMessageAcked: clientinfo: %s, message: %s\n", clientInfo, message); }}Copy the code
SampleHandler consists of two main parts:
-
The getActionOption method is overridden. This method configures message-related hooks, specifying a list of topics that need to take effect.
Configuration items Corresponding to the hook MESSAGE_PUBLISH_TOPICS message_publish MESSAGE_DELIVERED_TOPICS message_delivered MESSAGE_ACKED_TOPICS message_acked MESSAGE_DROPPED_TOPICS message_dropped -
Overloading the on
methods, which are the callbacks that actually handle hook events. These methods are named by prefixing each hookName variant with on, or by using CamelCase with the underscore removed from the hookName, for example, The corresponding function of the hook client_connect is called onClientConnect. Events generated by the EMQ X client, such as connect, publish, subscribe, etc., are eventually distributed to the hook event callback functions, which can then operate on the properties and states. Only the parameters are printed out in the sample program. If you only care about part of the hook event, you only need to overload the callback function for that part of the hook event. You do not need to overload all of the callback functions.
The timing of each callback function and the list of supported Hooks are exactly the same as those built into EMQ X, see: reset-emq X
In achieving their own extensions, the simplest way is also inherited DefaultCommunicationHandler parent class, the class of binding of the hook and the callback function encapsulation, and further encapsulates the callback function of parameters involved in data structures, used for quick learning.
Development of advanced
If the Java extensions of controllable more demanding, DefaultCommunicationHandler class have been unable to meet demand, can be realized through CommunicationHandler interface, from the underlying code logic control, more write more flexible extension program.
package emqx.extension.java.handler;
public interface CommunicationHandler {
public Object init();
public void deinit();
}
Copy the code
init()
Method: Used for initialization, declaring which hooks the extension needs to mount, and the mounted configurationdeinit()
Method: Used for logging out.
For details on the data format, see the design document.
Copyright: EMQ
Original link: www.emqx.io/cn/blog/dev…