In the previous article (www.zifangsky.cn/1355.html) I introduced in the Spring project using WebSocket implementation ways. However, the previous article only showed that the server uses broadcast mode to send messages to all clients, whereas we sometimes need the server to send messages to the client of a specified user (e.g., sending Web notifications, printing logs of user tasks in real time, peer-to-peer chats between two users, etc.).

As for how the server sends messages to the client of the specified user, there are generally three ways to achieve this:

  • Plan a: WebSocket is implemented using the “@Serverendpoint annotation provided by Java” or using the “Spring low-level API” to establish connections fromHttpSessionTo obtain the user’s login user name, and then add theUser name + the WebSocket connection“Stored in theConcurrentHashMap. To send a message to the specified user, you only need toGet the WebSocket connection established by the receiver based on the user name of the receiver, and then send a message to him.
  • Scheme 2: Dynamically add the user ID/ user name of the current login to the front of the listening path of the page. In this way, you only need to send broadcast messages to the client that listens to the previous path.
  • Plan 3: This scheme is similar to Scheme 1. Implement WebSocket using Spring’s advanced API, and then customize itHandshakeHandlerClass and writedetermineUserMethod, the purpose of which is to use the user’s login username as the credential of the WebSocket when establishing a connection, which we can finally usemessagingTemplate.convertAndSendToUserMethod sends a message to the specified user.

Note: The full source code for this article can be found at github.com/zifangsky/W…

Use SimpMessagingTemplate to send messages

Use org. Springframework. Messaging. Simp. SimpMessagingTemplate classes can be anywhere in the server send a message to the client. In addition, the SimpMessagingTemplate class is automatically assembled into the Spring context after we configure Spring to support STOMP, So we just need to inject the SimpMessagingTemplate with the @AutoWired annotation where we want to use it.

It should be noted that the SimpMessagingTemplate class has two important methods:

  • public void convertAndSend(D destination, Object payload): The path is monitoreddestinationAll clients send messagespayload
  • public void convertAndSendToUser(String user, String destination, Object payload): The path is monitoreddestinationThe useruserSend a messagepayload

A simple example:

package cn.zifangsky.stompwebsocket.controller;

import cn.zifangsky.stompwebsocket.model.websocket.Greeting;
import cn.zifangsky.stompwebsocket.model.websocket.HelloMessage;
import cn.zifangsky.stompwebsocket.service.RedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;

/** * test {@linkOrg. Springframework. Messaging. Simp. SimpMessagingTemplate} class basic usage of *@author zifangsky
 * @date 2018/10/10
 * @since1.0.0 * /
@Controller
@RequestMapping(("/wsTemplate"))
public class MessageTemplateController {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Autowired
    private SimpUserRegistry userRegistry;

    @Resource(name = "redisServiceImpl")
    private RedisService redisService;

    /** * Simple test SimpMessagingTemplate */
    @PostMapping("/greeting")
    @ResponseBody
    public String greeting(@RequestBody Greeting greeting) {
        this.messagingTemplate.convertAndSend("/topic/greeting".new HelloMessage("Hello," + greeting.getName() + "!"));

        return "ok"; }}Copy the code

This is obviously the address that the last example in the previous article was listening for. After the client page is connected, we request the above method using Postman. The effect is as follows:

Then we can see that the page also received the message:

Sends WebSocket messages to the specified user and handles the case that the user is not online

To send a message to a specified user:

  • If the receiver is online, the message is sent directly.
  • Otherwise, the message will be stored in Redis, and the user will pull the unread message after going online.

(1) Custom HandshakeInterceptor, which prevents users from connecting to WebSocket when they are not logged in:

package cn.zifangsky.stompwebsocket.interceptor.websocket;

import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;

import javax.servlet.http.HttpSession;
import java.text.MessageFormat;
import java.util.Map;

/** * custom {@linkOrg. Springframework. Web. Socket. Server HandshakeInterceptor}, realize "need to log in to allow connectivity WebSocket" * *@author zifangsky
 * @date 2018/10/11
 * @since1.0.0 * /
@Component
public class AuthHandshakeInterceptor implements HandshakeInterceptor {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
        HttpSession session = SpringContextUtils.getSession();
        User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

        if(loginUser ! =null){
            logger.debug(MessageFormat.format("User {0} requested a WebSocket connection", loginUser.getUsername()));
            return true;
        }else{
            logger.error("Do not connect to WebSocket if you are not logged in.");
            return false; }}@Override
    public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {}}Copy the code

(2) Custom HandshakeHandler for setting up websockets with custom principals:

package cn.zifangsky.stompwebsocket.interceptor.websocket;

import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

import javax.servlet.http.HttpSession;
import java.security.Principal;
import java.text.MessageFormat;
import java.util.Map;

/** * custom {@linkOrg. Springframework. Web. Socket. Server. Support. DefaultHandshakeHandler}, realize "generate custom {@linkJava. Security. Principal} "* *@author zifangsky
 * @date 2018/10/11
 * @since1.0.0 * /
@Component
public class MyHandshakeHandler extends DefaultHandshakeHandler{
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
        HttpSession session = SpringContextUtils.getSession();
        User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

        if(loginUser ! =null){
            logger.debug(MessageFormat.format("WebSocket connection started to create Principal, user: {0}", loginUser.getUsername()));
            return new MyPrincipal(loginUser.getUsername());
        }else{
            logger.error("Do not connect to WebSocket if you are not logged in.");
            return null; }}}Copy the code

Accordingly, MyPrincipal here inherits the java.security.Principal class:

package cn.zifangsky.stompwebsocket.interceptor.websocket;

import java.security.Principal;

/** * custom {@link java.security.Principal}
 *
 * @author zifangsky
 * @date 2018/10/11
 * @since1.0.0 * /
public class MyPrincipal implements Principal {
    private String loginName;

    public MyPrincipal(String loginName) {
        this.loginName = loginName;
    }

    @Override
    public String getName(a) {
        returnloginName; }}Copy the code

(3) Customize the ChannelInterceptor, which is used to log when the user disconnects:

package cn.zifangsky.stompwebsocket.interceptor.websocket;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.stereotype.Component;

import java.security.Principal;
import java.text.MessageFormat;

/** * custom {@linkOrg. Springframework. Messaging. Support. ChannelInterceptor}, disconnected implementation processing of * *@author zifangsky
 * @date 2018/10/10
 * @since1.0.0 * /
@Component
public class MyChannelInterceptor implements ChannelInterceptor{
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void afterSendCompletion(Message<? > message, MessageChannel channel,boolean sent, Exception ex) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();

        // The user is disconnected
        if(StompCommand.DISCONNECT.equals(command)){
            String user = "";
            Principal principal = accessor.getUser();
            if(principal ! =null && StringUtils.isNoneBlank(principal.getName())){
                user = principal.getName();
            }else{
                user = accessor.getSessionId();
            }

            logger.debug(MessageFormat.format(The WebSocket connection for user {0} has been disconnected., user)); }}}Copy the code

(4) Complete configuration related to WebSocket:

package cn.zifangsky.stompwebsocket.config;

import cn.zifangsky.stompwebsocket.interceptor.websocket.AuthHandshakeInterceptor;
import cn.zifangsky.stompwebsocket.interceptor.websocket.MyChannelInterceptor;
import cn.zifangsky.stompwebsocket.interceptor.websocket.MyHandshakeHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/** * WebSocket configuration **@author zifangsky
 * @date 2018/9/30
 * @since1.0.0 * /
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{
    @Autowired
    private AuthHandshakeInterceptor authHandshakeInterceptor;

    @Autowired
    private MyHandshakeHandler myHandshakeHandler;

    @Autowired
    private MyChannelInterceptor myChannelInterceptor;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/stomp-websocket").withSockJS();

        registry.addEndpoint("/chat-websocket")
                .addInterceptors(authHandshakeInterceptor)
                .setHandshakeHandler(myHandshakeHandler)
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // The client needs to send messages to the /message/ XXX address
        registry.setApplicationDestinationPrefixes("/message");
        // The server broadcasts the message path prefix. The client needs to subscribe to the message address /topic/yyy accordingly
        registry.enableSimpleBroker("/topic");
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(myChannelInterceptor); }}Copy the code

(5) Message processing in Controller is as follows:

package cn.zifangsky.stompwebsocket.controller;

import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.enums.ExpireEnum;
import cn.zifangsky.stompwebsocket.model.User;
import cn.zifangsky.stompwebsocket.model.websocket.Greeting;
import cn.zifangsky.stompwebsocket.model.websocket.HelloMessage;
import cn.zifangsky.stompwebsocket.service.RedisService;
import cn.zifangsky.stompwebsocket.utils.JsonUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.user.SimpUser;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** * test {@linkOrg. Springframework. Messaging. Simp. SimpMessagingTemplate} class basic usage of *@author zifangsky
 * @date 2018/10/10
 * @since1.0.0 * /
@Controller
@RequestMapping(("/wsTemplate"))
public class MessageTemplateController {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Autowired
    private SimpUserRegistry userRegistry;

    @Resource(name = "redisServiceImpl")
    private RedisService redisService;

    /** * Simple test SimpMessagingTemplate */
    @PostMapping("/greeting")
    @ResponseBody
    public String greeting(@RequestBody Greeting greeting) {
        this.messagingTemplate.convertAndSend("/topic/greeting".new HelloMessage("Hello," + greeting.getName() + "!"));

        return "ok";
    }

    /** * Sends a WebSocket message to the specified user */
    @PostMapping("/sendToUser")
    @ResponseBody
    public String chat(HttpServletRequest request) {
        // Message receiver
        String receiver = request.getParameter("receiver");
        // Message content
        String msg = request.getParameter("msg");
        HttpSession session = SpringContextUtils.getSession();
        User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

        HelloMessage resultData = new HelloMessage(MessageFormat.format("{0} say: {1}", loginUser.getUsername(), msg));
        this.sendToUser(loginUser.getUsername(), receiver, "/topic/reply", JsonUtils.toJson(resultData));

        return "ok";
    }

    /** * sends a message to the specified user and handles the case when the receiver is not online *@paramSender Message sender *@paramReceiver Message receiver *@paramDestination *@paramPayload Message body */
    private void sendToUser(String sender, String receiver, String destination, String payload){
        SimpUser simpUser = userRegistry.getUser(receiver);

        // If the receiver exists, the message is sent
        if(simpUser ! =null && StringUtils.isNoneBlank(simpUser.getName())){
            this.messagingTemplate.convertAndSendToUser(receiver, destination, payload);
        }
        // Otherwise, the message will be stored in Redis, and the user will pull the unread message after online
        else{
            // The name of the Redis list to store messages
            String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination;
            logger.info(MessageFormat.format("Message receiver {0} has not established a WebSocket connection, the message [{2}] sent by {1} will be stored in the [{3}] list of Redis.", receiver, sender, payload, listKey));

            // Store messages to RedisredisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload); }}/** * pulls an unread WebSocket message * from the specified listening path@paramDestination Specifies the listening path *@return java.util.Map<java.lang.String,java.lang.Object>
     */
    @PostMapping("/pullUnreadMessage")
    @ResponseBody
    public Map<String, Object> pullUnreadMessage(String destination){
        Map<String, Object> result = new HashMap<>();
        try {
            HttpSession session = SpringContextUtils.getSession();
            // Current login user
            User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

            // The name of the Redis list to store messages
            String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination;
            // Pull all unread messages from Redis
            List<Object> messageList = redisService.rangeList(listKey, 0, -1);

            result.put("code"."200");
            if(messageList ! =null && messageList.size() > 0) {// Delete the list of unread messages in Redis
                redisService.delete(listKey);
                // Add the data to the return set for display on the foreground page
                result.put("result", messageList); }}catch (Exception e){
            result.put("code"."500");
            result.put("msg", e.getMessage());
        }

        returnresult; }}Copy the code

Note: The corresponding Redis operations are as follows:

@Override
public boolean delete(String key) {
	return redisTemplate.delete(key);
}

@Override
public void addToListLeft(String listKey, ExpireEnum expireEnum, Object... values) {
	// Bind operation
	BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
	// Insert data
	boundValueOperations.leftPushAll(values);
	// Set the expiration time
	boundValueOperations.expire(expireEnum.getTime(),expireEnum.getTimeUnit());
}

@Override
public void addToListRight(String listKey, ExpireEnum expireEnum, Object... values) {
	// Bind operation
	BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
	// Insert data
	boundValueOperations.rightPushAll(values);
	// Set the expiration time
	boundValueOperations.expire(expireEnum.getTime(),expireEnum.getTimeUnit());
}

@Override
public List<Object> rangeList(String listKey, long start, long end) {
	// Bind operation
	BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
	// Query data
	return boundValueOperations.range(start, end);
}
Copy the code

(6) Sample page:

<html xmlns:th="http://www.thymeleaf.org">
<head>
    <meta content="text/html; charset=UTF-8"/>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
    <meta http-equiv="X-UA-Compatible" content="IE=edge"/>
    <meta name="viewport" content="width=device-width, initial-scale=1"/>
    <title>Chat With STOMP Message</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
    <script th:src="@{/layui/layui.js}"></script>
    <script th:src="@{/layui/lay/modules/layer.js}"></script>
    <link th:href="@{/layui/css/layui.css}" rel="stylesheet">
    <link th:href="@{/layui/css/modules/layer/default/layer.css}" rel="stylesheet">
    <link th:href="@{/css/style.css}" rel="stylesheet">
    <style type="text/css">
        #connect-container {
            margin: 0 auto;
            width: 400px;
        }

        #connect-container div {
            padding: 5px;
            margin: 0 7px 10px 0;
        }

        .message input {
            padding: 5px;
            margin: 0 7px 10px 0;
        }

        .layui-btn {
            display: inline-block;
        }
    </style>
    <script type="text/javascript">
        var stompClient = null;

        $(function () {
            var target = $("#target");
            if (window.location.protocol === 'http:') {
                target.val('http://' + window.location.host + target.val());
            } else {
                target.val('https://' + window.location.host + target.val()); }});function setConnected(connected) {
            var connect = $("#connect");
            var disconnect = $("#disconnect");
            var echo = $("#echo");

            if (connected) {
                connect.addClass("layui-btn-disabled");
                disconnect.removeClass("layui-btn-disabled");
                echo.removeClass("layui-btn-disabled");
            } else {
                connect.removeClass("layui-btn-disabled");
                disconnect.addClass("layui-btn-disabled");
                echo.addClass("layui-btn-disabled");
            }

            connect.attr("disabled", connected);
            disconnect.attr("disabled", !connected);
            echo.attr("disabled", !connected);
        }

        / / the connection
        function connect() {
            var target = $("#target").val();

            var ws = new SockJS(target);
            stompClient = Stomp.over(ws);

            stompClient.connect({}, function () {
                setConnected(true);
                log('Info: STOMP connection opened.');

                // After a successful connection, pull unread messages
                pullUnreadMessage("/topic/reply");

                // Subscribe to the server's /topic/reply address
                stompClient.subscribe("/user/topic/reply".function (response) {
                    log(JSON.parse(response.body).content); })},function () {
                // Disconnect processing
                setConnected(false);
                log('Info: STOMP connection closed.');
            });
        }

        // Disconnect the connection
        function disconnect() {
            if(stompClient ! =null) {
                stompClient.disconnect();
                stompClient = null;
            }
            setConnected(false);
            log('Info: STOMP connection closed.');
        }

        // Send a message to the specified user
        function sendMessage() {
            if(stompClient ! =null) {
                var receiver = $("#receiver").val();
                var msg = $("#message").val();
                log('Sent: ' + JSON.stringify({'receiver': receiver, 'msg':msg}));

                $.ajax({
                    url: "/wsTemplate/sendToUser".type: "POST".dataType: "json".async: true.data: {
                        "receiver": receiver,
                        "msg": msg
                    },
                    success: function (data) {}}); }else {
                layer.msg('STOMP connection not established, please connect.', {
                    offset: 'auto'
                    ,icon: 2}); }}// Pull unread messages from the server
        function pullUnreadMessage(destination) {
            $.ajax({
                url: "/wsTemplate/pullUnreadMessage".type: "POST".dataType: "json".async: true.data: {
                    "destination": destination
                },
                success: function (data) {
                    if(data.result ! =null) {
                        $.each(data.result, function (i, item) {
                            log(JSON.parse(item).content); })}else if(data.code ! =null && data.code == "500") {
                        layer.msg(data.msg, {
                            offset: 'auto'
                            ,icon: 2}); }}}); }// Log output
        function log(message) {
            console.debug(message);
        }
    </script>
</head>
<body>
    <noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websockets rely on Javascript being
        enabled. Please enable
        Javascript and reload this page!</h2></noscript>
    <div>
        <div id="connect-container" class="layui-elem-field">
            <legend>Chat With STOMP Message</legend>
            <div>
                <input id="target" type="text" class="layui-input" size="40" style="width: 350px" value="/chat-websocket"/>
            </div>
            <div>
                <button id="connect" class="layui-btn layui-btn-normal" onclick="connect();">Connect</button>
                <button id="disconnect" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled"
                        onclick="disconnect();">Disconnect
                </button>

            </div>
            <div class="message">
                <input id="receiver" type="text" class="layui-input" size="40" style="width: 350px" placeholder="Name of recipient" value=""/>
                <input id="message" type="text" class="layui-input" size="40" style="width: 350px" placeholder="Message content" value=""/>
            </div>
            <div>
                <button id="echo" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled"
                        onclick="sendMessage();">Send Message
                </button>
            </div>
        </div>
    </div>
</body>
</html>
Copy the code

After starting the project, log in to each browser using a different account, and then send messages to each other with the following effect:

Interface:

Interface 2: