Maven introduces dependencies
<? The XML version = "1.0" encoding = "utf-8"? > < project XMLNS = "http://maven.apache.org/POM/4.0.0" XMLNS: xsi = "http://www.w3.org/2001/XMLSchema-instance" Xsi: schemaLocation = "http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion > 4.0.0 < / modelVersion > < the parent > < groupId > org. Springframework. Boot < / groupId > The < artifactId > spring - the boot - starter - parent < / artifactId > < version > 2.2.2. RELEASE < / version > < relativePath / > <! -- lookup parent from repository --> </parent> <groupId>com.lixy</groupId> <artifactId>websocket</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo-websocket</name> <description>Demo project for Spring Boot</description> < properties > < Java version > 1.8 < / Java version > < fastjson. Version > 1.2.61 < / fastjson version > < Commons - lang3 version > 3.9 < / Commons - lang3. Version > < / properties > < dependencies > < the dependency > <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> <exclusions> <! -- Exclude tomcat --> <artifactId> Spring-boot-starter-tomcat </artifactId> <groupId>org.springframework.boot</groupId> </exclusion> </exclusions> </dependency> <! <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-undertow</artifactId> </dependency> <! --fast json--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>${commons-lang3.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>compile</scope> </dependency> </dependencies> <repositories> <repository> <id>ali-snapshots</id> <url>http://maven.aliyun.com/nexus/content/groups/public</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>spring-snapshots</id> <url>http://repo.spring.io/libs-snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>ali-snapshots</id> <url>http://maven.aliyun.com/nexus/content/groups/public</url> <snapshots> <enabled>true</enabled> </snapshots> </pluginRepository> <pluginRepository> <id>spring-snapshots</id> <url>http://repo.spring.io/libs-snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </pluginRepository> </pluginRepositories> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>Copy the code
Websocket config configuration
import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @date 2021/5/31 15:09 * @version: 1.0 */ @configuration @slf4j @Component public class WebSocketConfig {@bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }}Copy the code
Core classes MyWebSocketService
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Websocket server
*
*/
@Slf4j
@ServerEndpoint(value = "/websocket/{key}")
@Component
public class MyWebSocketService {
private static final String KEY_CODE = "code";
private static final String KEY_DATA = "data";
private static final String NULL_KEY = "null";
/**
* 心跳连接有效时间(毫秒)
*/
private static final Long BEAT_HEART_DURATION_TIME_MILLIS = 10 * 60 * 1000L;
/**
* 用来记录当前在线连接数
*/
private static AtomicInteger onlineCount = new AtomicInteger(0);
/**
* concurrent包的线程安全Map,用来存放每个客户端对应的Session对象。
* 存储结构为key(mac或ip等),连接
*/
public static Map<String, Session> clients = new ConcurrentHashMap<String, Session>();
/**
* concurrent包的线程安全Map,用来存放每个客户端对应的Session对象。
* 存储结构为连接,key(mac或ip等)
*/
private static Map<Session, String> sessionMap = new ConcurrentHashMap<Session, String>();
private static Map<String, Session> oldClients = new ConcurrentHashMap<String, Session>();
private static Map<Session, Long> sessionBeatheartMap = new ConcurrentHashMap<Session, Long>();
/**
* 在客户初次连接时触发,
* 这里会为客户端创建一个session,这个session并不是我们所熟悉的httpsession
*
* @param session
*/
@OnOpen
public void onOpen(@PathParam("key") String key, Session session) {
if (StringUtils.isEmpty(key) || NULL_KEY.equalsIgnoreCase(key)) {
try {
log.warn("[key={}]非法,禁止连接!!!", key);
session.close();
} catch (IOException e) {
}
}
if (clients.containsKey(key)) {
//删除原有连接
destroyOldSession(key);
}
//在线数加1
addOnlineCount();
clients.put(key, session);
sessionMap.put(session, key);
sessionBeatheartMap.put(session, System.currentTimeMillis());
log.info("有新连接[key={}]加入!当前在线连接数为{}", key, getOnlineCount());
}
/**
* 在客户端与服务器端断开连接时触发。
*/
@OnClose
public void onClose(Session session) {
String key = sessionMap.get(session);
if (StringUtils.isNotEmpty(key)) {
if (clients.containsKey(key)) {
clients.remove(key);
//在线数减1
subOnlineCount();
}
sessionMap.remove(session);
sessionBeatheartMap.remove(session);
log.info("有一连接 [key={}]关闭!当前在线连接数为{}", key, getOnlineCount());
/**通知系统断开连接**/
destroyOldSession(key);
}
}
/**
* 收到客户端消息后调用的方法
*
* @param message 消息内容
*/
@OnMessage
public void onMessage(Session session, String message) {
if (StringUtils.isEmpty(message)) {
//通过message空进行心跳检测
log.info("接收到会话[{}]心跳消息", session.getId());
sessionBeatheartMap.put(session, System.currentTimeMillis());
return;
}
String key = sessionMap.get(session);
log.info("接收到 [key={}] 通信消息:{}", key, message);
try {
//发送给指定用户,json格式
JSONObject requestJson = JSONObject.parseObject(message);
if (!requestJson.containsKey(KEY_CODE)) {
return;
}
Integer clientCode = requestJson.getInteger(KEY_CODE);
String data = requestJson.getString(KEY_DATA);
switch (clientCode) {
case WebSocketMessageType.SERVER_PUSH_START:
//消息处理
break;
case WebSocketMessageType.SERVER_PUSH_STOP:
//消息处理
break;
default:
break;
}
} catch (Exception e) {
}
}
/**
* 定时发送信息到 所有已连接 websocket
*/
@Scheduled(cron = "0 */5 * * * ?")
public void processTerminalInformation() {
if (clients.isEmpty()) {
return;
}
clients.forEach((k, v) -> {
try {
//异步发送
v.getAsyncRemote().sendText("我的内容");
} catch (Exception e) {
/**连接不可用,清理连接**/
destroyOldSession(k);
}
});
}
/**
* 在线时长计时,心跳检测
*/
@Scheduled(cron = "0 */1 * * * ?")
public void processOnlineTime() {
oldClients.forEach((k, v) -> {
try {
Long lastBeatTime = sessionBeatheartMap.get(v);
if (lastBeatTime == null || (System.currentTimeMillis() - lastBeatTime) > BEAT_HEART_DURATION_TIME_MILLIS) {
/**超过90秒未收到空消息,KEY 设备已断开连接**/
destroyOldSession(k);
}
} catch (Exception e) {
/**连接不可用,清理连接**/
destroyOldSession(k);
}
});
oldClients = clients;
}
/**
* 定点发送消息
*
* @param key key(key为设备mac,ip等唯一性参数)
* @param message 消息文本
* @throws IOException
*/
public boolean sendMessageTo(String key, String message) {
Session session = clients.get(key);
log.info("websocket尝试向 KEY[{}]推送消息[{}]...", key, message);
if (session == null || !session.isOpen() || session.getAsyncRemote() == null) {
log.warn("(key:{})无可用会话", key);
destroyOldSession(key);
return false;
}
try {
session.getBasicRemote().sendText(message);
log.info("websocket向 KEY[{}]推送消息[{}]成功", key, message);
return true;
} catch (Exception e) {
log.error("websocket向 KEY [{}]推送消息[{}]失败", key, message);
log.error("websocket向 KEY [" + key + "]发送消息遇到问题", e);
/**连接不可用,清理连接**/
destroyOldSession(key);
return false;
}
}
/**
* 发生错误时调用此方法
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误:" + error.getMessage(), error);
if (session == null) {
return;
}
if (!sessionMap.containsKey(session)) {
return;
}
String key = sessionMap.get(session);
if (StringUtils.isEmpty(key)) {
destroyOldSession(key);
}
}
private void destroyOldSession(String key) {
Session oldSession = clients.get(key);
if (oldSession != null) {
if (clients.containsKey(key)) {
subOnlineCount();
clients.remove(key);
if (oldSession != null) {
sessionMap.remove(oldSession);
sessionBeatheartMap.remove(oldSession);
}
try {
oldSession.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "已断开连接!"));
} catch (IOException e) {
}
}
}
}
/**
* 获取在线key地址
*
* @return
*/
public List<String> getOnlineWebKey() {
List<String> keyList = new ArrayList<>();
if (clients.isEmpty()) {
return keyList;
}
clients.forEach((k, v) -> {
keyList.add(k);
});
return keyList;
}
/**
* 得到当前连接人数
*
* @return
*/
public static synchronized AtomicInteger getOnlineCount() {
return onlineCount;
}
/**
* 增加连接人数
*/
public static synchronized void addOnlineCount() {
onlineCount.incrementAndGet();
}
/**
* 减少连接人数
*/
public static synchronized void subOnlineCount() {
onlineCount.decrementAndGet();
}
}
Copy the code
The message body
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; /** * WebSocket message */ @toString @data @noargsconstructor @allargsconstructor Public class WebSocketMessage {/** * message encoding */ private int code; /** * message body */ private Object data; public WebSocketMessage setQueryParams(Object params){ this.data=params; return this; } public WebSocketMessage setData(Object data) { this.data = data; return this; }}Copy the code
Message type
/** ** public interface WebSocketMessageType {/** ** start command */ int SERVER_PUSH_START = 400001; /** * stop command */ int SERVER_PUSH_STOP = 400002; }Copy the code
Start the class
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @enablesCheduling @springBootApplication Public Class DemoWebsocketApplication {public static void main(String[] args) { SpringApplication.run(DemoWebsocketApplication.class, args); }}Copy the code
Among them, the processOnlineTime () whether detection timeout, determine whether disconnected, processTerminalInformation () time send a message to the connected client
Finally through the ws: / / IP: port/web (servlet path)/websocket / 111 registered client online test tools: https://www.idcd.com/tool/socketCopy the code