demand
The Kafka service is available. Kafka service data (GPS) is deployed to a local disk (stored as a text file). Now we want to implement a real-time vehicle map based on Echarts.
Analysis of the
- Front-end real-time display: Websocket technology is used to achieve server-side data push to the front-end display
- Data is obtained through Java’s Kafka client and pushed to the front end through WebSock.
websocket
Introduction to the
Websocket is a protocol that HTML5 started to provide for full duplex communication over a unit TCP connection. In the WebSocket API, the browser and the server only need to shake hands once, and then the browser and the server form a fast channel. Data can be transmitted between the two.
The development of
- The server side
package com.ykkj.weiyi.socket;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.server.standard.SpringConfigurator;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
/ * * *@ServerEndpointAn annotation is a class-level annotation that defines the current class as a WebSocket server. * The value of the annotation will be used to listen for the user's connection to the terminal access URL, which the client can use to connect to the WebSocket server */
@ServerEndpoint(value = "/websocket")
public class CommodityServer {
// Static variable, used to record the current number of online connections. It should be designed to be thread-safe.
private static int onlineCount = 0;
// A thread-safe Set for a concurrent package, used to hold each client's corresponding MyWebSocket object. To enable the server to communicate with a single client, a Map can be used to store the Key as the user identity
public static CopyOnWriteArraySet<CommodityServer> webSocketSet = new CopyOnWriteArraySet<CommodityServer>();
// A connection session with a client through which to send data to the client
private Session session;
/** * The connection was successfully established to call the method **@paramSession Optional parameter. Session Is a connection session with a client through which data is sent to the client */
@OnOpen
public void onOpen(Session session) {
this.session = session;
webSocketSet.add(this); // add to set
addOnlineCount(); // The number of lines increases by 1
System.out.println("New connection added! The number of current online users is" + getOnlineCount());
}
/** * the connection closes the called method */
@OnClose
public void onClose(a) {
webSocketSet.remove(this); // Delete from set
subOnlineCount(); // The number of lines is reduced by 1
System.out.println("There's a connection down! The number of current online users is" + getOnlineCount());
}
/** * The method called after receiving the client message **@paramMessage Indicates the message sent by the client@paramSession Optional parameter */
@OnMessage
public void onMessage(String message, Session session) {
System.out.println("Message from client :" + message);
// Group message
for (CommodityServer item : webSocketSet) {
try {
item.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
continue; }}}/** ** is called when an error occurs@param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
System.out.println("Error occurred");
error.printStackTrace();
}
/** * this method is different from the above methods. There are no annotations. You add methods as needed. * *@param message
* @throws IOException
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
//this.session.getAsyncRemote().sendText(message);
}
public static synchronized int getOnlineCount(a) {
return onlineCount;
}
public static synchronized void addOnlineCount(a) {
CommodityServer.onlineCount++;
}
public static synchronized void subOnlineCount(a) { CommodityServer.onlineCount--; }}Copy the code
-
The front end
<html> <head> <title>Tomcat implementation of Java backend WebSocket</title> </head> <body> Welcome<br/><input id="text" type="text"/> <button onclick="send()">Send a message</button> <hr/> <button onclick="closeWebSocket()">Close the WebSocket connection</button> <hr/> <div id="message"></div> </body> <script type="text/javascript"> var websocket = null; // Check whether the current browser supports WebSocket if ('WebSocket' in window) { websocket = new WebSocket("ws://localhost:8081/onepic/websocket"); } else { alert('Current browser Not support websocket')}// Connection error callback method websocket.onerror = function () { setMessageInnerHTML("WebSocket connection error"); }; // The callback method for successfully establishing the connection websocket.onopen = function () { setMessageInnerHTML("WebSocket connection successful"); } // The callback method that received the message websocket.onmessage = function (event) { setMessageInnerHTML(event.data); } // A callback method to close the connection websocket.onclose = function () { setMessageInnerHTML("WebSocket connection closed"); } // Listen for window closing events, when the window is closed, actively close websocket connection, to prevent the connection is not closed, the server will throw exceptions. window.onbeforeunload = function () { closeWebSocket(); } // Displays the message on the web page function setMessageInnerHTML(innerHTML) { document.getElementById('message').innerHTML += innerHTML + '<br/>'; } // Close the WebSocket connection function closeWebSocket() { websocket.close(); } // Send a message function send() { var message = document.getElementById('text').value; websocket.send(message); } </script> </html> Copy the code
test
Pay attention to the point
-
The webSocketSet is set to a global static variable that provides calls to other classes
public static CopyOnWriteArraySet<CommodityServer> webSocketSet = new CopyOnWriteArraySet<CommodityServer>(); Copy the code
-
The server side is implemented with annotations @serverendpoint @onOpen @onclose @onMessage @onError
-
Tomcat7.0.47 and later support websocket1.0
-
Add JAR support to POM
<dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId> <version>7.0</version> <scope>provided</scope> </dependency> Copy the code
Kafka
Introduction to the
Kafka is a distributed, partitioned, and replicable messaging system.
The development of
Please refer to my notes on Kafka Environment Setup (Windows)
- kafka client for java
package com.ykkj.weiyi.socket;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
import static com.ykkj.weiyi.socket.CommodityServer.webSocketSet;
public class ConsumerKafka extends Thread {
private KafkaConsumer<String, String> consumer;
private String topic = "test.topic";
public ConsumerKafka(a) {}@Override
public void run(a) {
// Load the kafka consumer parameter
Properties props = new Properties();
props.put("bootstrap.servers"."localhost:9092");
props.put("group.id"."ytna");
props.put("enable.auto.commit"."true");
props.put("auto.commit.interval.ms"."1000");
props.put("session.timeout.ms"."15000");
props.put("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
// Create a consumer object
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(this.topic));
// An endless loop of kafka consumption
while (true) {
try {
// Consume data and set the timeout
ConsumerRecords<String, String> records = consumer.poll(100);
//Consumer message
for (ConsumerRecord<String, String> record : records) {
//Send message to every client
for(CommodityServer webSocket : webSocketSet) { webSocket.sendMessage(record.value()); }}}catch (IOException e) {
System.out.println(e.getMessage());
continue; }}}public void close(a) {
try {
consumer.close();
} catch(Exception e) { System.out.println(e.getMessage()); }}// For testing purposes, if tomcat is used to start the thread, use another method to start it
public static void main(String[] args) {
ConsumerKafka consumerKafka = newConsumerKafka(); consumerKafka.start(); }}Copy the code
Note the topic and bootstrap.Servers configuration
- Call the class
package com.ykkj.weiyi.socket;
public class RunThread {
public RunThread(a) {
ConsumerKafka kafka = newConsumerKafka(); kafka.start(); }}Copy the code
- The web.xml configuration
<listener>
<listener-class>com.ykkj.weiyi.socket.RunThread</listener-class>
</listener>
Copy the code
test
Pay attention to the point
- ConsumerKafka needs to configure listening in web.xml, otherwise it cannot fetch the webSocketSet variable in the ConsumerKafka class
- Reference the webSocketSet variable method
import static com.ykkj.weiyi.socket.CommodityServer.webSocketSet;
- Note the topic and bootstrap.Servers configuration
conclusion
This is just a technical validation, not a real implementation. In a real scenario, kafka would need to clean the data that does not comply with the current specifications, and then assemble the data in the format required for the front-end presentation.
Refer to online materials
https://blog.csdn.net/lw_ghy/article/details/73252904
https://blog.csdn.net/liu857279611/article/details/70157012