In the previous article (www.zifangsky.cn/1355.html) I introduced in the Spring project using WebSocket implementation ways. However, the previous article only showed that the server uses broadcast mode to send messages to all clients, whereas we sometimes need the server to send messages to the client of a specified user (e.g., sending Web notifications, printing logs of user tasks in real time, peer-to-peer chats between two users, etc.).
As for how the server sends messages to the client of the specified user, there are generally three ways to achieve this:
- Plan a: WebSocket is implemented using the “@Serverendpoint annotation provided by Java” or using the “Spring low-level API” to establish connections from
HttpSession
To obtain the user’s login user name, and then add theUser name + the WebSocket connection“Stored in theConcurrentHashMap
. To send a message to the specified user, you only need toGet the WebSocket connection established by the receiver based on the user name of the receiver, and then send a message to him. - Scheme 2: Dynamically add the user ID/ user name of the current login to the front of the listening path of the page. In this way, you only need to send broadcast messages to the client that listens to the previous path.
- Plan 3: This scheme is similar to Scheme 1. Implement WebSocket using Spring’s advanced API, and then customize it
HandshakeHandler
Class and writedetermineUser
Method, the purpose of which is to use the user’s login username as the credential of the WebSocket when establishing a connection, which we can finally usemessagingTemplate.convertAndSendToUser
Method sends a message to the specified user.
Note: The full source code for this article can be found at github.com/zifangsky/W…
Use SimpMessagingTemplate to send messages
Use org. Springframework. Messaging. Simp. SimpMessagingTemplate classes can be anywhere in the server send a message to the client. In addition, the SimpMessagingTemplate class is automatically assembled into the Spring context after we configure Spring to support STOMP, So we just need to inject the SimpMessagingTemplate with the @AutoWired annotation where we want to use it.
It should be noted that the SimpMessagingTemplate class has two important methods:
public void convertAndSend(D destination, Object payload)
: The path is monitoreddestination
All clients send messagespayload
public void convertAndSendToUser(String user, String destination, Object payload)
: The path is monitoreddestination
The useruser
Send a messagepayload
A simple example:
package cn.zifangsky.stompwebsocket.controller;
import cn.zifangsky.stompwebsocket.model.websocket.Greeting;
import cn.zifangsky.stompwebsocket.model.websocket.HelloMessage;
import cn.zifangsky.stompwebsocket.service.RedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
/** * test {@linkOrg. Springframework. Messaging. Simp. SimpMessagingTemplate} class basic usage of *@author zifangsky
* @date 2018/10/10
* @since1.0.0 * /
@Controller
@RequestMapping(("/wsTemplate"))
public class MessageTemplateController {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;
@Resource(name = "redisServiceImpl")
private RedisService redisService;
/** * Simple test SimpMessagingTemplate */
@PostMapping("/greeting")
@ResponseBody
public String greeting(@RequestBody Greeting greeting) {
this.messagingTemplate.convertAndSend("/topic/greeting".new HelloMessage("Hello," + greeting.getName() + "!"));
return "ok"; }}Copy the code
This is obviously the address that the last example in the previous article was listening for. After the client page is connected, we request the above method using Postman. The effect is as follows:
Then we can see that the page also received the message:
Sends WebSocket messages to the specified user and handles the case that the user is not online
To send a message to a specified user:
- If the receiver is online, the message is sent directly.
- Otherwise, the message will be stored in Redis, and the user will pull the unread message after going online.
(1) Custom HandshakeInterceptor, which prevents users from connecting to WebSocket when they are not logged in:
package cn.zifangsky.stompwebsocket.interceptor.websocket;
import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import javax.servlet.http.HttpSession;
import java.text.MessageFormat;
import java.util.Map;
/** * custom {@linkOrg. Springframework. Web. Socket. Server HandshakeInterceptor}, realize "need to log in to allow connectivity WebSocket" * *@author zifangsky
* @date 2018/10/11
* @since1.0.0 * /
@Component
public class AuthHandshakeInterceptor implements HandshakeInterceptor {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
HttpSession session = SpringContextUtils.getSession();
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
if(loginUser ! =null){
logger.debug(MessageFormat.format("User {0} requested a WebSocket connection", loginUser.getUsername()));
return true;
}else{
logger.error("Do not connect to WebSocket if you are not logged in.");
return false; }}@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {}}Copy the code
(2) Custom HandshakeHandler for setting up websockets with custom principals:
package cn.zifangsky.stompwebsocket.interceptor.websocket;
import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
import javax.servlet.http.HttpSession;
import java.security.Principal;
import java.text.MessageFormat;
import java.util.Map;
/** * custom {@linkOrg. Springframework. Web. Socket. Server. Support. DefaultHandshakeHandler}, realize "generate custom {@linkJava. Security. Principal} "* *@author zifangsky
* @date 2018/10/11
* @since1.0.0 * /
@Component
public class MyHandshakeHandler extends DefaultHandshakeHandler{
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
HttpSession session = SpringContextUtils.getSession();
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
if(loginUser ! =null){
logger.debug(MessageFormat.format("WebSocket connection started to create Principal, user: {0}", loginUser.getUsername()));
return new MyPrincipal(loginUser.getUsername());
}else{
logger.error("Do not connect to WebSocket if you are not logged in.");
return null; }}}Copy the code
Accordingly, MyPrincipal here inherits the java.security.Principal class:
package cn.zifangsky.stompwebsocket.interceptor.websocket;
import java.security.Principal;
/** * custom {@link java.security.Principal}
*
* @author zifangsky
* @date 2018/10/11
* @since1.0.0 * /
public class MyPrincipal implements Principal {
private String loginName;
public MyPrincipal(String loginName) {
this.loginName = loginName;
}
@Override
public String getName(a) {
returnloginName; }}Copy the code
(3) Customize the ChannelInterceptor, which is used to log when the user disconnects:
package cn.zifangsky.stompwebsocket.interceptor.websocket;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.stereotype.Component;
import java.security.Principal;
import java.text.MessageFormat;
/** * custom {@linkOrg. Springframework. Messaging. Support. ChannelInterceptor}, disconnected implementation processing of * *@author zifangsky
* @date 2018/10/10
* @since1.0.0 * /
@Component
public class MyChannelInterceptor implements ChannelInterceptor{
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void afterSendCompletion(Message<? > message, MessageChannel channel,boolean sent, Exception ex) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
StompCommand command = accessor.getCommand();
// The user is disconnected
if(StompCommand.DISCONNECT.equals(command)){
String user = "";
Principal principal = accessor.getUser();
if(principal ! =null && StringUtils.isNoneBlank(principal.getName())){
user = principal.getName();
}else{
user = accessor.getSessionId();
}
logger.debug(MessageFormat.format(The WebSocket connection for user {0} has been disconnected., user)); }}}Copy the code
(4) Complete configuration related to WebSocket:
package cn.zifangsky.stompwebsocket.config;
import cn.zifangsky.stompwebsocket.interceptor.websocket.AuthHandshakeInterceptor;
import cn.zifangsky.stompwebsocket.interceptor.websocket.MyChannelInterceptor;
import cn.zifangsky.stompwebsocket.interceptor.websocket.MyHandshakeHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
/** * WebSocket configuration **@author zifangsky
* @date 2018/9/30
* @since1.0.0 * /
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{
@Autowired
private AuthHandshakeInterceptor authHandshakeInterceptor;
@Autowired
private MyHandshakeHandler myHandshakeHandler;
@Autowired
private MyChannelInterceptor myChannelInterceptor;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/stomp-websocket").withSockJS();
registry.addEndpoint("/chat-websocket")
.addInterceptors(authHandshakeInterceptor)
.setHandshakeHandler(myHandshakeHandler)
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// The client needs to send messages to the /message/ XXX address
registry.setApplicationDestinationPrefixes("/message");
// The server broadcasts the message path prefix. The client needs to subscribe to the message address /topic/yyy accordingly
registry.enableSimpleBroker("/topic");
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(myChannelInterceptor); }}Copy the code
(5) Message processing in Controller is as follows:
package cn.zifangsky.stompwebsocket.controller;
import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.enums.ExpireEnum;
import cn.zifangsky.stompwebsocket.model.User;
import cn.zifangsky.stompwebsocket.model.websocket.Greeting;
import cn.zifangsky.stompwebsocket.model.websocket.HelloMessage;
import cn.zifangsky.stompwebsocket.service.RedisService;
import cn.zifangsky.stompwebsocket.utils.JsonUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.user.SimpUser;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** * test {@linkOrg. Springframework. Messaging. Simp. SimpMessagingTemplate} class basic usage of *@author zifangsky
* @date 2018/10/10
* @since1.0.0 * /
@Controller
@RequestMapping(("/wsTemplate"))
public class MessageTemplateController {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;
@Resource(name = "redisServiceImpl")
private RedisService redisService;
/** * Simple test SimpMessagingTemplate */
@PostMapping("/greeting")
@ResponseBody
public String greeting(@RequestBody Greeting greeting) {
this.messagingTemplate.convertAndSend("/topic/greeting".new HelloMessage("Hello," + greeting.getName() + "!"));
return "ok";
}
/** * Sends a WebSocket message to the specified user */
@PostMapping("/sendToUser")
@ResponseBody
public String chat(HttpServletRequest request) {
// Message receiver
String receiver = request.getParameter("receiver");
// Message content
String msg = request.getParameter("msg");
HttpSession session = SpringContextUtils.getSession();
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
HelloMessage resultData = new HelloMessage(MessageFormat.format("{0} say: {1}", loginUser.getUsername(), msg));
this.sendToUser(loginUser.getUsername(), receiver, "/topic/reply", JsonUtils.toJson(resultData));
return "ok";
}
/** * sends a message to the specified user and handles the case when the receiver is not online *@paramSender Message sender *@paramReceiver Message receiver *@paramDestination *@paramPayload Message body */
private void sendToUser(String sender, String receiver, String destination, String payload){
SimpUser simpUser = userRegistry.getUser(receiver);
// If the receiver exists, the message is sent
if(simpUser ! =null && StringUtils.isNoneBlank(simpUser.getName())){
this.messagingTemplate.convertAndSendToUser(receiver, destination, payload);
}
// Otherwise, the message will be stored in Redis, and the user will pull the unread message after online
else{
// The name of the Redis list to store messages
String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination;
logger.info(MessageFormat.format("Message receiver {0} has not established a WebSocket connection, the message [{2}] sent by {1} will be stored in the [{3}] list of Redis.", receiver, sender, payload, listKey));
// Store messages to RedisredisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload); }}/** * pulls an unread WebSocket message * from the specified listening path@paramDestination Specifies the listening path *@return java.util.Map<java.lang.String,java.lang.Object>
*/
@PostMapping("/pullUnreadMessage")
@ResponseBody
public Map<String, Object> pullUnreadMessage(String destination){
Map<String, Object> result = new HashMap<>();
try {
HttpSession session = SpringContextUtils.getSession();
// Current login user
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
// The name of the Redis list to store messages
String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination;
// Pull all unread messages from Redis
List<Object> messageList = redisService.rangeList(listKey, 0, -1);
result.put("code"."200");
if(messageList ! =null && messageList.size() > 0) {// Delete the list of unread messages in Redis
redisService.delete(listKey);
// Add the data to the return set for display on the foreground page
result.put("result", messageList); }}catch (Exception e){
result.put("code"."500");
result.put("msg", e.getMessage());
}
returnresult; }}Copy the code
Note: The corresponding Redis operations are as follows:
@Override
public boolean delete(String key) {
return redisTemplate.delete(key);
}
@Override
public void addToListLeft(String listKey, ExpireEnum expireEnum, Object... values) {
// Bind operation
BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
// Insert data
boundValueOperations.leftPushAll(values);
// Set the expiration time
boundValueOperations.expire(expireEnum.getTime(),expireEnum.getTimeUnit());
}
@Override
public void addToListRight(String listKey, ExpireEnum expireEnum, Object... values) {
// Bind operation
BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
// Insert data
boundValueOperations.rightPushAll(values);
// Set the expiration time
boundValueOperations.expire(expireEnum.getTime(),expireEnum.getTimeUnit());
}
@Override
public List<Object> rangeList(String listKey, long start, long end) {
// Bind operation
BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
// Query data
return boundValueOperations.range(start, end);
}
Copy the code
(6) Sample page:
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta content="text/html; charset=UTF-8"/>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=edge"/>
<meta name="viewport" content="width=device-width, initial-scale=1"/>
<title>Chat With STOMP Message</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
<script th:src="@{/layui/layui.js}"></script>
<script th:src="@{/layui/lay/modules/layer.js}"></script>
<link th:href="@{/layui/css/layui.css}" rel="stylesheet">
<link th:href="@{/layui/css/modules/layer/default/layer.css}" rel="stylesheet">
<link th:href="@{/css/style.css}" rel="stylesheet">
<style type="text/css">
#connect-container {
margin: 0 auto;
width: 400px;
}
#connect-container div {
padding: 5px;
margin: 0 7px 10px 0;
}
.message input {
padding: 5px;
margin: 0 7px 10px 0;
}
.layui-btn {
display: inline-block;
}
</style>
<script type="text/javascript">
var stompClient = null;
$(function () {
var target = $("#target");
if (window.location.protocol === 'http:') {
target.val('http://' + window.location.host + target.val());
} else {
target.val('https://' + window.location.host + target.val()); }});function setConnected(connected) {
var connect = $("#connect");
var disconnect = $("#disconnect");
var echo = $("#echo");
if (connected) {
connect.addClass("layui-btn-disabled");
disconnect.removeClass("layui-btn-disabled");
echo.removeClass("layui-btn-disabled");
} else {
connect.removeClass("layui-btn-disabled");
disconnect.addClass("layui-btn-disabled");
echo.addClass("layui-btn-disabled");
}
connect.attr("disabled", connected);
disconnect.attr("disabled", !connected);
echo.attr("disabled", !connected);
}
/ / the connection
function connect() {
var target = $("#target").val();
var ws = new SockJS(target);
stompClient = Stomp.over(ws);
stompClient.connect({}, function () {
setConnected(true);
log('Info: STOMP connection opened.');
// After a successful connection, pull unread messages
pullUnreadMessage("/topic/reply");
// Subscribe to the server's /topic/reply address
stompClient.subscribe("/user/topic/reply".function (response) {
log(JSON.parse(response.body).content); })},function () {
// Disconnect processing
setConnected(false);
log('Info: STOMP connection closed.');
});
}
// Disconnect the connection
function disconnect() {
if(stompClient ! =null) {
stompClient.disconnect();
stompClient = null;
}
setConnected(false);
log('Info: STOMP connection closed.');
}
// Send a message to the specified user
function sendMessage() {
if(stompClient ! =null) {
var receiver = $("#receiver").val();
var msg = $("#message").val();
log('Sent: ' + JSON.stringify({'receiver': receiver, 'msg':msg}));
$.ajax({
url: "/wsTemplate/sendToUser".type: "POST".dataType: "json".async: true.data: {
"receiver": receiver,
"msg": msg
},
success: function (data) {}}); }else {
layer.msg('STOMP connection not established, please connect.', {
offset: 'auto'
,icon: 2}); }}// Pull unread messages from the server
function pullUnreadMessage(destination) {
$.ajax({
url: "/wsTemplate/pullUnreadMessage".type: "POST".dataType: "json".async: true.data: {
"destination": destination
},
success: function (data) {
if(data.result ! =null) {
$.each(data.result, function (i, item) {
log(JSON.parse(item).content); })}else if(data.code ! =null && data.code == "500") {
layer.msg(data.msg, {
offset: 'auto'
,icon: 2}); }}}); }// Log output
function log(message) {
console.debug(message);
}
</script>
</head>
<body>
<noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websockets rely on Javascript being
enabled. Please enable
Javascript and reload this page!</h2></noscript>
<div>
<div id="connect-container" class="layui-elem-field">
<legend>Chat With STOMP Message</legend>
<div>
<input id="target" type="text" class="layui-input" size="40" style="width: 350px" value="/chat-websocket"/>
</div>
<div>
<button id="connect" class="layui-btn layui-btn-normal" onclick="connect();">Connect</button>
<button id="disconnect" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled"
onclick="disconnect();">Disconnect
</button>
</div>
<div class="message">
<input id="receiver" type="text" class="layui-input" size="40" style="width: 350px" placeholder="Name of recipient" value=""/>
<input id="message" type="text" class="layui-input" size="40" style="width: 350px" placeholder="Message content" value=""/>
</div>
<div>
<button id="echo" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled"
onclick="sendMessage();">Send Message
</button>
</div>
</div>
</div>
</body>
</html>
Copy the code
After starting the project, log in to each browser using a different account, and then send messages to each other with the following effect:
Interface:
Interface 2: