Use WebSocket in multi-load environments.

A, reason

In some business scenarios, we need the page to refresh the background operation in real time, so we need to use websocket.

Usually in the case of a single background without any problems, if the background through nginx load, it will lead to the foreground can not prepare to receive the response given by the background. Socket is a long connection, and its session will only be stored on one server. Other loads will not hold this session. At this time, we need to use redis publish and subscribe to realize session sharing.

Two, environmental preparation

Find Websock in mvnRepository.com/…

<! -- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-websocket -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    <version>2.2.10. RELEASE</version>
</dependency>

Copy the code

In addition to adding redis dependencies, also use the Starter version:

        <! -- redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
Copy the code

Three, code,

Redis listening configuration:


/ * * *@description: Redis listens to configuration classes *@author: weirx *@date: 2021/3/22 14:08 *@version: 3.0 * /
@Configuration
public class RedisConfig {

    /** * description: Redis is listening to IOC **@param redisConnectionFactory
     * @return: org.springframework.data.redis.listener.RedisMessageListenerContainer
     * @author: weirx
     * @time: 2021/3/22 14:11
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        returncontainer; }}Copy the code

WebSocket configuration:

/ * * *@description: WebSocket configuration class *@author: weirx *@date: * 2021/3/22 brought on@version: 3.0 * /
@Configuration
public class WebSocketConfig {

    /** * description: This configuration class is used to inject ServerEndpointExporter, * this bean will be automatically registered for use@ServerEndpointAnnotate declared Websocket endpoint. * Do not inject ServerEndpointExporter if you are using a separate servlet container rather than directly using SpringBoot's built-in container, as it will be provided and managed by the container itself. * *@return: org.springframework.web.socket.server.standard.ServerEndpointExporter
     * @author: weirx
     * @time: 2021/3/22 14:12 * /
    @Bean
    public ServerEndpointExporter serverEndpointExporter(a){
        return newServerEndpointExporter(); }}Copy the code

Redis utility class:

@Component
public class RedisUtil {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;


    /** * release **@param key
     */
    public void publish(String key, String value) { stringRedisTemplate.convertAndSend(key, value); }}Copy the code

WebSocket service provides classes:

/**
 * description: @ServerEndpointAn annotation is a class level annotation, * Its main function is to define the current class as a WebSocket server, the value of the annotation will be used to listen for the user to connect to the terminal access URL, * the client can connect to the WebSocket server through this URL using springBoot only difference is to@ComponentThe container itself manages webSockets, but in SpringBoot even the container is managed by Spring. * *@author: weirx
 * @time: 2021/3/22 o * /
@Slf4j
@Component
@ServerEndpoint("/websocket/server/{loginName}")
public class WebSocketServer {

    Because the / * * *@ServerEndpointInjection is not supported, so use SpringUtils to get an IOC instance */
    private RedisMessageListenerContainer redisMessageListenerContainer =
            ApplicationContextProvider.getBean(RedisMessageListenerContainer.class);

    /** * static variable used to record the number of current online connections. It should be designed to be thread-safe. * /
    private static AtomicInteger onlineCount = new AtomicInteger(0);

    /** * A thread-safe Set for a concurrent package, used to hold webSocket objects corresponding to each client. * If you want a server to communicate with a single client, you can use a Map where the Key identifies the user */
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();

    /** * Connects to a client through which to send data to the client */
    private Session session;

    /** * redis listen */
    private SubscribeListener subscribeListener;

    /** * The connection was successfully established to call the method **@paramSession Optional parameter. Session Is a connection session with a client through which data is sent to the client */
    @OnOpen
    public void onOpen(@PathParam("loginName") String loginName, Session session) {
        this.session = session;
        // add to set
        webSocketSet.add(this);
        // The number of lines increases by 1
        addOnlineCount();
        log.info("New connection [" + loginName + "] to join us! The number of current online users is {}", getOnlineCount());
        subscribeListener = new SubscribeListener();
        subscribeListener.setSession(session);
        // Set up the subscription topic
        redisMessageListenerContainer.addMessageListener(
                subscribeListener, new ChannelTopic(Constants.TOPIC_PREFIX + loginName));

    }

    /** * the connection closes the called method */
    @OnClose
    public void onClose(a) throws IOException {
        // Delete from set
        webSocketSet.remove(this);
        // The number of lines is reduced by 1
        subOnlineCount();
        redisMessageListenerContainer.removeMessageListener(subscribeListener);
        log.info("There's a connection down! The number of current online users is {}", getOnlineCount());
    }

    /** * The method called after receiving the client message **@paramMessage Indicates the message sent by the client@paramSession Optional parameter */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("Message from client :{}", message);
        // Group message
        for (WebSocketServer item : webSocketSet) {
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                log.info("Error sending message: MSG = {}", e);
                continue; }}}/** ** is called when an error occurs@param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.info("Error, {}", error);
    }

    /** * this method is different from the above methods. There are no annotations. You add methods as needed. * *@param message
     * @throws IOException
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    public int getOnlineCount(a) {
        return onlineCount.get();
    }

    public void addOnlineCount(a) {
        WebSocketServer.onlineCount.getAndIncrement();
    }

    public void subOnlineCount(a) { WebSocketServer.onlineCount.getAndDecrement(); }}Copy the code

Redis news release:

    @Autowired
    private RedisUtil redisUtil;

    @Override
    public Result send(String loginName, String msg) {
        // Push the webSocket
        redisUtil.publish("TOPIC" + loginName, msg);
        return Result.success();
    }
Copy the code

Front-end VUE code:

<template>
  <div class="dashboard-container">
    <div class="dashboard-text">{{responseData}}</div>
  </div>
</template>

<script>
  import {mapGetters} from 'vuex'

  export default {
    data() {
      return {
        websocket: null.responseData: null}},created() {
      this.initWebSocket();
    },
    destroyed() {
      this.websock.close() // Disconnect the WebSocket after leaving the route
    },
    methods: {
      // Initialize the webSocket
      initWebSocket() {
        const wsUri = Ws: / / "127.0.0.1:21116 / websocket/server/" + "admin";
        this.websock = new WebSocket(wsUri);
        this.websock.onmessage = this.websocketonmessage;
        this.websock.onopen = this.websocketonopen;
        this.websock.onerror = this.websocketonerror;
        this.websock.onclose = this.websocketclose;
      },
      websocketonopen() { // After the connection is established, the send method is used to send data
        let actions = {"User Account": "admin"};
        this.websocketsend(JSON.stringify(actions));
      },
      websocketonerror() {// Failed to establish the connection
        this.initWebSocket();
      },
      websocketonmessage(e) { // Data receive
        const redata = JSON.parse(e.data);
        this.responseData = redata;
      },
      websocketsend(Data) {// Data is sent
        this.websock.send(Data);
      },
      websocketclose(e) {  / / close
        console.log('Disconnect', e); }},name: 'Dashboard'.computed: {
      ...mapGetters([
        'name'.'roles'])}}</script>
Copy the code

Four, test,

5. Proxy configuration

In the actual work, we really use websocket to reach the server from the front end through proxy forwarding of the request. Here are two different types of configuration with the same function:

5.1 the gateway configuration

Springcloud Gateway is a gateway widely used in microservices at present. We can achieve the dynamic proxy of WebSocket through the following configuration: static route configuration:

spring:
  cloud:
    gateway:
      discovery:
        locator:
          enabled: true # enable the function of dynamically creating routes from registries using microservice names
      routes:
        - id: websocket  Keep the route ID unique
          uri: lb:ws://inbox-model # URI refers to the target service address, lb means to obtain the service from the registry
          predicates:
            - Path=/websocket/server/**
          filters:
            - StripPrefix=0
Copy the code

Dynamic route configuration: To use dynamic routes, the gateway must support and enable dynamic routing

[{
    "id": "websocket"."order": 2."predicates": [{
        "args": {
            "pattern": "/websocket/server/**"
        },
        "name": "Path"}]."uri": "lb:ws://inbox-model"
}]
Copy the code

5.2 nginx configuration

location /websocket {
    proxy_pass http://gateway;
    The following three are officially specified configurationsProxy_http_version 1.1; proxy_set_header Upgrade$http_upgrade;
    proxy_set_header Connection "upgrade";
    If the timeout period is not set, the client will not receive messages for a period of time. Keep the timeout period consistent with the login session
    proxy_read_timeout 3600s;
}
Copy the code