Maven introduces dependencies

<? The XML version = "1.0" encoding = "utf-8"? > < project XMLNS = "http://maven.apache.org/POM/4.0.0" XMLNS: xsi = "http://www.w3.org/2001/XMLSchema-instance" Xsi: schemaLocation = "http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion > 4.0.0 < / modelVersion > < the parent > < groupId > org. Springframework. Boot < / groupId > The < artifactId > spring - the boot - starter - parent < / artifactId > < version > 2.2.2. RELEASE < / version > < relativePath / > <! -- lookup parent from repository --> </parent> <groupId>com.lixy</groupId> <artifactId>websocket</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo-websocket</name> <description>Demo project for Spring Boot</description> < properties > < Java version > 1.8 < / Java version > < fastjson. Version > 1.2.61 < / fastjson version > < Commons - lang3 version > 3.9 < / Commons - lang3. Version > < / properties > < dependencies > < the dependency > <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> <exclusions> <! -- Exclude tomcat --> <artifactId> Spring-boot-starter-tomcat </artifactId> <groupId>org.springframework.boot</groupId> </exclusion> </exclusions> </dependency> <! <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-undertow</artifactId> </dependency> <! --fast json--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>${commons-lang3.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>compile</scope> </dependency> </dependencies> <repositories> <repository> <id>ali-snapshots</id> <url>http://maven.aliyun.com/nexus/content/groups/public</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>spring-snapshots</id> <url>http://repo.spring.io/libs-snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>ali-snapshots</id> <url>http://maven.aliyun.com/nexus/content/groups/public</url> <snapshots> <enabled>true</enabled> </snapshots> </pluginRepository> <pluginRepository> <id>spring-snapshots</id> <url>http://repo.spring.io/libs-snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </pluginRepository> </pluginRepositories> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>Copy the code

Websocket config configuration

import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @date 2021/5/31 15:09 * @version: 1.0 */ @configuration @slf4j @Component public class WebSocketConfig {@bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }}Copy the code

Core classes MyWebSocketService

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;


/**
 * Websocket server
 *
 */
@Slf4j
@ServerEndpoint(value = "/websocket/{key}")
@Component
public class MyWebSocketService {

    private static final String KEY_CODE = "code";
    private static final String KEY_DATA = "data";
    private static final String NULL_KEY = "null";
    /**
     * 心跳连接有效时间(毫秒)
     */
    private static final Long BEAT_HEART_DURATION_TIME_MILLIS = 10 * 60 * 1000L;

    /**
     * 用来记录当前在线连接数
     */
    private static AtomicInteger onlineCount = new AtomicInteger(0);

    /**
     * concurrent包的线程安全Map,用来存放每个客户端对应的Session对象。
     * 存储结构为key(mac或ip等),连接
     */
    public static Map<String, Session> clients = new ConcurrentHashMap<String, Session>();

    /**
     * concurrent包的线程安全Map,用来存放每个客户端对应的Session对象。
     * 存储结构为连接,key(mac或ip等)
     */
    private static Map<Session, String> sessionMap = new ConcurrentHashMap<Session, String>();

    private static Map<String, Session> oldClients = new ConcurrentHashMap<String, Session>();


    private static Map<Session, Long> sessionBeatheartMap = new ConcurrentHashMap<Session, Long>();

    /**
     * 在客户初次连接时触发,
     * 这里会为客户端创建一个session,这个session并不是我们所熟悉的httpsession
     *
     * @param session
     */
    @OnOpen
    public void onOpen(@PathParam("key") String key, Session session) {
        if (StringUtils.isEmpty(key) || NULL_KEY.equalsIgnoreCase(key)) {
            try {
                log.warn("[key={}]非法,禁止连接!!!", key);
                session.close();
            } catch (IOException e) {
            }
        }
        if (clients.containsKey(key)) {
            //删除原有连接
            destroyOldSession(key);
        }
        //在线数加1
        addOnlineCount();
        clients.put(key, session);
        sessionMap.put(session, key);
        sessionBeatheartMap.put(session, System.currentTimeMillis());
        log.info("有新连接[key={}]加入!当前在线连接数为{}", key, getOnlineCount());

    }

    /**
     * 在客户端与服务器端断开连接时触发。
     */
    @OnClose
    public void onClose(Session session) {
        String key = sessionMap.get(session);
        if (StringUtils.isNotEmpty(key)) {
            if (clients.containsKey(key)) {
                clients.remove(key);
                //在线数减1
                subOnlineCount();
            }
            sessionMap.remove(session);
            sessionBeatheartMap.remove(session);
            log.info("有一连接 [key={}]关闭!当前在线连接数为{}", key, getOnlineCount());
            /**通知系统断开连接**/
            destroyOldSession(key);
        }
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 消息内容
     */
    @OnMessage
    public void onMessage(Session session, String message) {
        if (StringUtils.isEmpty(message)) {
            //通过message空进行心跳检测
            log.info("接收到会话[{}]心跳消息", session.getId());
            sessionBeatheartMap.put(session, System.currentTimeMillis());
            return;
        }
        String key = sessionMap.get(session);
        log.info("接收到 [key={}] 通信消息:{}", key, message);
        try {
            //发送给指定用户,json格式
            JSONObject requestJson = JSONObject.parseObject(message);

            if (!requestJson.containsKey(KEY_CODE)) {
                return;
            }
            Integer clientCode = requestJson.getInteger(KEY_CODE);
            String data = requestJson.getString(KEY_DATA);

            switch (clientCode) {
                case WebSocketMessageType.SERVER_PUSH_START:
                    //消息处理
                    break;
                case WebSocketMessageType.SERVER_PUSH_STOP:
                    //消息处理
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
        }
    }

    /**
     * 定时发送信息到 所有已连接 websocket
     */
    @Scheduled(cron = "0 */5 * * * ?")
    public void processTerminalInformation() {
        if (clients.isEmpty()) {
            return;
        }
        clients.forEach((k, v) -> {
            try {
                //异步发送
                v.getAsyncRemote().sendText("我的内容");
            } catch (Exception e) {
                /**连接不可用,清理连接**/
                destroyOldSession(k);
            }
        });
    }

    /**
     * 在线时长计时,心跳检测
     */
    @Scheduled(cron = "0 */1 * * * ?")
    public void processOnlineTime() {

        oldClients.forEach((k, v) -> {
            try {
                Long lastBeatTime = sessionBeatheartMap.get(v);
                if (lastBeatTime == null || (System.currentTimeMillis() - lastBeatTime) > BEAT_HEART_DURATION_TIME_MILLIS) {
                    /**超过90秒未收到空消息,KEY 设备已断开连接**/
                    destroyOldSession(k);
                }
            } catch (Exception e) {
                /**连接不可用,清理连接**/
                destroyOldSession(k);
            }
        });
        oldClients = clients;
    }

    /**
     * 定点发送消息
     *
     * @param key     key(key为设备mac,ip等唯一性参数)
     * @param message 消息文本
     * @throws IOException
     */
    public boolean sendMessageTo(String key, String message) {
        Session session = clients.get(key);
        log.info("websocket尝试向 KEY[{}]推送消息[{}]...", key, message);
        if (session == null || !session.isOpen() || session.getAsyncRemote() == null) {
            log.warn("(key:{})无可用会话", key);
            destroyOldSession(key);
            return false;
        }
        try {
            session.getBasicRemote().sendText(message);
            log.info("websocket向 KEY[{}]推送消息[{}]成功", key, message);
            return true;
        } catch (Exception e) {
            log.error("websocket向 KEY [{}]推送消息[{}]失败", key, message);
            log.error("websocket向 KEY [" + key + "]发送消息遇到问题", e);
            /**连接不可用,清理连接**/
            destroyOldSession(key);
            return false;
        }
    }


    /**
     * 发生错误时调用此方法
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误:" + error.getMessage(), error);
        if (session == null) {
            return;
        }
        if (!sessionMap.containsKey(session)) {
            return;
        }
        String key = sessionMap.get(session);
        if (StringUtils.isEmpty(key)) {
            destroyOldSession(key);
        }
    }

    private void destroyOldSession(String key) {
        Session oldSession = clients.get(key);
        if (oldSession != null) {
            if (clients.containsKey(key)) {
                subOnlineCount();
                clients.remove(key);
                if (oldSession != null) {
                    sessionMap.remove(oldSession);
                    sessionBeatheartMap.remove(oldSession);
                }
                try {
                    oldSession.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "已断开连接!"));
                } catch (IOException e) {
                }
            }
        }
    }


    /**
     * 获取在线key地址
     *
     * @return
     */
    public List<String> getOnlineWebKey() {
        List<String> keyList = new ArrayList<>();
        if (clients.isEmpty()) {
            return keyList;
        }
        clients.forEach((k, v) -> {
            keyList.add(k);
        });
        return keyList;
    }

    /**
     * 得到当前连接人数
     *
     * @return
     */
    public static synchronized AtomicInteger getOnlineCount() {
        return onlineCount;
    }

    /**
     * 增加连接人数
     */
    public static synchronized void addOnlineCount() {
        onlineCount.incrementAndGet();
    }

    /**
     * 减少连接人数
     */
    public static synchronized void subOnlineCount() {
        onlineCount.decrementAndGet();
    }


}

Copy the code

The message body

import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; /** * WebSocket message */ @toString @data @noargsconstructor @allargsconstructor Public class WebSocketMessage {/** * message encoding */ private int code; /** * message body */ private Object data; public WebSocketMessage setQueryParams(Object params){ this.data=params; return this; } public WebSocketMessage setData(Object data) { this.data = data; return this; }}Copy the code

Message type

/** ** public interface WebSocketMessageType {/** ** start command */ int SERVER_PUSH_START = 400001; /** * stop command */ int SERVER_PUSH_STOP = 400002; }Copy the code

Start the class

import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @enablesCheduling @springBootApplication Public Class DemoWebsocketApplication {public static void main(String[] args) { SpringApplication.run(DemoWebsocketApplication.class, args); }}Copy the code

Among them, the processOnlineTime () whether detection timeout, determine whether disconnected, processTerminalInformation () time send a message to the connected client

Finally through the ws: / / IP: port/web (servlet path)/websocket / 111 registered client online test tools: https://www.idcd.com/tool/socketCopy the code