In the previous article (www.zifangsky.cn/1359.html) I introduced the service end users to specify how the client sends a message, and how to deal with each other online. In this article we continue to consider another important question, which is: If our project is distributed and logged-in users are assigned to multiple servers by Nginx’s reverse proxy, how can a user who has a WebSocket connection on one server send a message to a user who has a WebSocket connection on the other server?
In fact, to solve this problem, we need to implement distributed WebSocket, and distributed WebSocket can be generally implemented through the following two solutions:
- Solution a: Push the message (< user ID, message content >) to a topic in a message queue (Redis, Kafka, etc.). Then each application node subscribs to this topic. After receiving the WebSocket message, fetch the “user ID/ user name of the message receiver” of the message. Then it compares whether it has a connection with the corresponding user. If so, it will push the message, otherwise it will discard the received message (the application node where the message receiver resides will process it).
- Scheme 2: After the user establishes WebSocket connection, Redis cache is used to record the application node on which the user’s WebSocket is established, and message queue is also used to push the message to the application node where the receiver is located (it is more complicated than Scheme 1 in implementation, but the network traffic will be lower).
Note: The full source code for this article can be found at github.com/zifangsky/W…
In the following example, I will implement according to the relatively simple scheme one, which is as follows:
(1) Define a WebSocket Channel enumeration class:
package cn.zifangsky.mqwebsocket.enums;
import org.apache.commons.lang3.StringUtils;
/** * WebSocket Channel enumeration class **@author zifangsky
* @date 2018/10/16
* @since1.0.0 * /
public enum WebSocketChannelEnum {
// Test the use of simple peer-to-peer chat
CHAT("CHAT"."Easy peer-to-peer chat to test."."/topic/reply");
WebSocketChannelEnum(String code, String description, String subscribeUrl) {
this.code = code;
this.description = description;
this.subscribeUrl = subscribeUrl;
}
/** * unique CODE */
private String code;
/** * description */
private String description;
/** * The URL to which the WebSocket client subscribed */
private String subscribeUrl;
public String getCode(a) {
return code;
}
public String getDescription(a) {
return description;
}
public String getSubscribeUrl(a) {
return subscribeUrl;
}
/** * Find the enumeration class */ by CODE
public static WebSocketChannelEnum fromCode(String code){
if(StringUtils.isNoneBlank(code)){
for(WebSocketChannelEnum channelEnum : values()){
if(channelEnum.code.equals(code)){
returnchannelEnum; }}}return null; }}Copy the code
(2) Configuring message queues based on Redis:
About Redis implement of message queue can reference before I of this article: www.zifangsky.cn/1347.html
It is important to note that Redis is not recommended for medium to large scale formal projects as it has not been tested to be particularly reliable, so professional message queue middleware such as Kafka and rabbitMQ should be considered (PS: Redis 5.0’s new Data structure Streams has greatly enhanced Redis’s message queuing capabilities.
package cn.zifangsky.mqwebsocket.config;
import cn.zifangsky.mqwebsocket.mq.MessageReceiver;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;
import java.util.Arrays;
/** * Redis configuration **@author zifangsky
* @date 2018/7/30
* @since1.0.0 * /
@Configuration
@ConditionalOnClass({JedisCluster.class})
public class RedisConfig {
@Value("${spring.redis.timeout}")
private String timeOut;
@Value("${spring.redis.cluster.nodes}")
private String nodes;
@Value("${spring.redis.cluster.max-redirects}")
private int maxRedirects;
@Value("${spring.redis.jedis.pool.max-active}")
private int maxActive;
@Value("${spring.redis.jedis.pool.max-wait}")
private int maxWait;
@Value("${spring.redis.jedis.pool.max-idle}")
private int maxIdle;
@Value("${spring.redis.jedis.pool.min-idle}")
private int minIdle;
@Value("${spring.redis.message.topic-name}")
private String topicName;
@Bean
public JedisPoolConfig jedisPoolConfig(a){
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(maxActive);
config.setMaxIdle(maxIdle);
config.setMinIdle(minIdle);
config.setMaxWaitMillis(maxWait);
return config;
}
@Bean
public RedisClusterConfiguration redisClusterConfiguration(a){
RedisClusterConfiguration configuration = new RedisClusterConfiguration(Arrays.asList(nodes));
configuration.setMaxRedirects(maxRedirects);
return configuration;
}
/** * JedisConnectionFactory */
@Bean
public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration configuration,JedisPoolConfig jedisPoolConfig){
return new JedisConnectionFactory(configuration,jedisPoolConfig);
}
/** * use Jackson to serialize objects */
@Bean
public Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer(a){
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
serializer.setObjectMapper(objectMapper);
return serializer;
}
/** * RedisTemplate */
@Bean
public RedisTemplate<String, Object> redisTemplate(JedisConnectionFactory factory, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(factory);
// Serialize the KEY as a string
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
// Serialize VALUE in JSON mode
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
/** * message listener */
@Bean
MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){
// The message receiver and the corresponding default processing method
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageReceiver, "receiveMessage");
// The deserialization of the message
messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer);
return messageListenerAdapter;
}
/** * message listener container */
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory , MessageListenerAdapter messageListenerAdapter){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// Add a message listener
container.addMessageListener(messageListenerAdapter, new PatternTopic(topicName));
returncontainer; }}Copy the code
Note that the configuration used here is as follows:
spring:
.
#redis
redis:
cluster:
nodes: namenode22:6379,datanode23:6379,datanode24:6379
max-redirects: 6
timeout: 300000
jedis:
pool:
max-active: 8
max-wait: 100000
max-idle: 8
min-idle: 0
# custom listening TOPIC path
message:
topic-name: topic-test
Copy the code
(3) Define a Redis message handler:
package cn.zifangsky.mqwebsocket.mq;
import cn.zifangsky.mqwebsocket.enums.WebSocketChannelEnum;
import cn.zifangsky.mqwebsocket.model.websocket.RedisWebsocketMsg;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.user.SimpUser;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Component;
import java.text.MessageFormat;
/** * The handler of WebSocket messages in Redis **@author zifangsky
* @date 2018/10/16
* @since1.0.0 * /
@Component
public class MessageReceiver {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;
/** * Process WebSocket messages */
public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) {
logger.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg));
//1. Obtain the user name and check whether it is connected to the WebSocket of the current application node
SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());
if(simpUser ! =null && StringUtils.isNoneBlank(simpUser.getName())){
//2. Obtain the subscription address of the WebSocket client
WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());
if(channelEnum ! =null) {//3. Send messages to the WebSocket clientmessagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent()); }}}}Copy the code
(4) Send WebSocket message in Controller:
package cn.zifangsky.mqwebsocket.controller;
import cn.zifangsky.mqwebsocket.common.Constants;
import cn.zifangsky.mqwebsocket.common.SpringContextUtils;
import cn.zifangsky.mqwebsocket.enums.ExpireEnum;
import cn.zifangsky.mqwebsocket.enums.WebSocketChannelEnum;
import cn.zifangsky.mqwebsocket.model.User;
import cn.zifangsky.mqwebsocket.model.websocket.HelloMessage;
import cn.zifangsky.mqwebsocket.model.websocket.RedisWebsocketMsg;
import cn.zifangsky.mqwebsocket.service.RedisService;
import cn.zifangsky.mqwebsocket.utils.JsonUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.user.SimpUser;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** * test {@linkOrg. Springframework. Messaging. Simp. SimpMessagingTemplate} class basic usage of *@author zifangsky
* @date 2018/10/10
* @since1.0.0 * /
@Controller
@RequestMapping(("/wsTemplate"))
public class RedisMessageController {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Value("${spring.redis.message.topic-name}")
private String topicName;
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;
@Resource(name = "redisServiceImpl")
private RedisService redisService;
/** * Sends a WebSocket message to the specified user */
@PostMapping("/sendToUser")
@ResponseBody
public String chat(HttpServletRequest request) {
// Message receiver
String receiver = request.getParameter("receiver");
// Message content
String msg = request.getParameter("msg");
HttpSession session = SpringContextUtils.getSession();
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
HelloMessage resultData = new HelloMessage(MessageFormat.format("{0} say: {1}", loginUser.getUsername(), msg));
this.sendToUser(loginUser.getUsername(), receiver, WebSocketChannelEnum.CHAT.getSubscribeUrl(), JsonUtils.toJson(resultData));
return "ok";
}
/** * sends a message to the specified user and handles the case when the receiver is not online *@paramSender Message sender *@paramReceiver Message receiver *@paramDestination *@paramPayload Message body */
private void sendToUser(String sender, String receiver, String destination, String payload){
SimpUser simpUser = userRegistry.getUser(receiver);
// If the receiver exists, the message is sent
if(simpUser ! =null && StringUtils.isNoneBlank(simpUser.getName())){
messagingTemplate.convertAndSendToUser(receiver, destination, payload);
}
If the receiver is online, the receiver is connected to another node in the cluster, and the node that the receiver is connected to needs to be notified to send the message
else if(redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, receiver)){
RedisWebsocketMsg<String> redisWebsocketMsg = new RedisWebsocketMsg<>(receiver, WebSocketChannelEnum.CHAT.getCode(), payload);
redisService.convertAndSend(topicName, redisWebsocketMsg);
}
// Otherwise, the message will be stored in Redis, and the user will pull the unread message after online
else{
// The name of the Redis list to store messages
String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination;
logger.info(MessageFormat.format("Message receiver {0} has not established a WebSocket connection, the message [{2}] sent by {1} will be stored in the [{3}] list of Redis.", receiver, sender, payload, listKey));
// Store messages to RedisredisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload); }}/** * pulls an unread WebSocket message * from the specified listening path@paramDestination Specifies the listening path *@return java.util.Map<java.lang.String,java.lang.Object>
*/
@PostMapping("/pullUnreadMessage")
@ResponseBody
public Map<String, Object> pullUnreadMessage(String destination){
Map<String, Object> result = new HashMap<>();
try {
HttpSession session = SpringContextUtils.getSession();
// Current login user
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
// The name of the Redis list to store messages
String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination;
// Pull all unread messages from Redis
List<Object> messageList = redisService.rangeList(listKey, 0, -1);
result.put("code"."200");
if(messageList ! =null && messageList.size() > 0) {// Delete the list of unread messages in Redis
redisService.delete(listKey);
// Add the data to the return set for display on the foreground page
result.put("result", messageList); }}catch (Exception e){
result.put("code"."500");
result.put("msg", e.getMessage());
}
returnresult; }}Copy the code
(5) Other interceptors handle WebSocket connection related issues:
I) AuthHandshakeInterceptor:
package cn.zifangsky.mqwebsocket.interceptor.websocket;
import cn.zifangsky.mqwebsocket.common.Constants;
import cn.zifangsky.mqwebsocket.common.SpringContextUtils;
import cn.zifangsky.mqwebsocket.model.User;
import cn.zifangsky.mqwebsocket.service.RedisService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import javax.annotation.Resource;
import javax.servlet.http.HttpSession;
import java.text.MessageFormat;
import java.util.Map;
/** * custom {@linkOrg. Springframework. Web. Socket. Server HandshakeInterceptor}, realize "need to log in to allow connectivity WebSocket" * *@author zifangsky
* @date 2018/10/11
* @since1.0.0 * /
@Component
public class AuthHandshakeInterceptor implements HandshakeInterceptor {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Resource(name = "redisServiceImpl")
private RedisService redisService;
@Override
public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
HttpSession session = SpringContextUtils.getSession();
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
if(redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, loginUser.getUsername())){
logger.error("The same user is not allowed to create multiple connection websockets");
return false;
}else if(loginUser == null || StringUtils.isBlank(loginUser.getUsername())){
logger.error("Do not connect to WebSocket if you are not logged in.");
return false;
}else{
logger.debug(MessageFormat.format("User {0} requested a WebSocket connection", loginUser.getUsername()));
return true; }}@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {}}Copy the code
Ii) MyHandshakeHandler:
package cn.zifangsky.mqwebsocket.interceptor.websocket;
import cn.zifangsky.mqwebsocket.common.Constants;
import cn.zifangsky.mqwebsocket.common.SpringContextUtils;
import cn.zifangsky.mqwebsocket.model.User;
import cn.zifangsky.mqwebsocket.service.RedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
import javax.annotation.Resource;
import javax.servlet.http.HttpSession;
import java.security.Principal;
import java.text.MessageFormat;
import java.util.Map;
/** * custom {@linkOrg. Springframework. Web. Socket. Server. Support. DefaultHandshakeHandler}, realize "generate custom {@linkJava. Security. Principal} "* *@author zifangsky
* @date 2018/10/11
* @since1.0.0 * /
@Component
public class MyHandshakeHandler extends DefaultHandshakeHandler{
private final Logger logger = LoggerFactory.getLogger(getClass());
@Resource(name = "redisServiceImpl")
private RedisService redisService;
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
HttpSession session = SpringContextUtils.getSession();
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
if(loginUser ! =null){
logger.debug(MessageFormat.format("WebSocket connection started to create Principal, user: {0}", loginUser.getUsername()));
//1. Save the user name to Redis
redisService.addToSet(Constants.REDIS_WEBSOCKET_USER_SET, loginUser.getUsername());
//2. Return the custom Principal
return new MyPrincipal(loginUser.getUsername());
}else{
logger.error("Do not connect to WebSocket if you are not logged in.");
return null; }}}Copy the code
Iii) MyChannelInterceptor:
package cn.zifangsky.mqwebsocket.interceptor.websocket;
import cn.zifangsky.mqwebsocket.common.Constants;
import cn.zifangsky.mqwebsocket.service.RedisService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.security.Principal;
import java.text.MessageFormat;
/** * custom {@linkOrg. Springframework. Messaging. Support. ChannelInterceptor}, disconnected implementation processing of * *@author zifangsky
* @date 2018/10/10
* @since1.0.0 * /
@Component
public class MyChannelInterceptor implements ChannelInterceptor{
private final Logger logger = LoggerFactory.getLogger(getClass());
@Resource(name = "redisServiceImpl")
private RedisService redisService;
@Override
public void afterSendCompletion(Message<? > message, MessageChannel channel,boolean sent, Exception ex) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
StompCommand command = accessor.getCommand();
// The user is disconnected
if(StompCommand.DISCONNECT.equals(command)){
String user = "";
Principal principal = accessor.getUser();
if(principal ! =null && StringUtils.isNoneBlank(principal.getName())){
user = principal.getName();
// Remove the user from Redis
redisService.removeFromSet(Constants.REDIS_WEBSOCKET_USER_SET, user);
}else{
user = accessor.getSessionId();
}
logger.debug(MessageFormat.format(The WebSocket connection for user {0} has been disconnected., user)); }}}Copy the code
(6) WebSocket configuration:
package cn.zifangsky.mqwebsocket.config;
import cn.zifangsky.mqwebsocket.interceptor.websocket.MyHandshakeHandler;
import cn.zifangsky.mqwebsocket.interceptor.websocket.AuthHandshakeInterceptor;
import cn.zifangsky.mqwebsocket.interceptor.websocket.MyChannelInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
/** * WebSocket configuration **@author zifangsky
* @date 2018/9/30
* @since1.0.0 * /
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{
@Autowired
private AuthHandshakeInterceptor authHandshakeInterceptor;
@Autowired
private MyHandshakeHandler myHandshakeHandler;
@Autowired
private MyChannelInterceptor myChannelInterceptor;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/chat-websocket")
.addInterceptors(authHandshakeInterceptor)
.setHandshakeHandler(myHandshakeHandler)
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// The client needs to send messages to the /message/ XXX address
registry.setApplicationDestinationPrefixes("/message");
// The server broadcasts the message path prefix. The client needs to subscribe to the message address /topic/yyy accordingly
registry.enableSimpleBroker("/topic");
// The path prefix to send messages to the specified user. The default value is /user/
registry.setUserDestinationPrefix("/user/");
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(myChannelInterceptor); }}Copy the code
(7) Sample page:
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta content="text/html; charset=UTF-8"/>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=edge"/>
<meta name="viewport" content="width=device-width, initial-scale=1"/>
<title>Chat With STOMP Message</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
<script th:src="@{/layui/layui.js}"></script>
<script th:src="@{/layui/lay/modules/layer.js}"></script>
<link th:href="@{/layui/css/layui.css}" rel="stylesheet">
<link th:href="@{/layui/css/modules/layer/default/layer.css}" rel="stylesheet">
<link th:href="@{/css/style.css}" rel="stylesheet">
<style type="text/css">
#connect-container {
margin: 0 auto;
width: 400px;
}
#connect-container div {
padding: 5px;
margin: 0 7px 10px 0;
}
.message input {
padding: 5px;
margin: 0 7px 10px 0;
}
.layui-btn {
display: inline-block;
}
</style>
<script type="text/javascript">
var stompClient = null;
$(function () {
var target = $("#target");
if (window.location.protocol === 'http:') {
target.val('http://' + window.location.host + target.val());
} else {
target.val('https://' + window.location.host + target.val()); }});function setConnected(connected) {
var connect = $("#connect");
var disconnect = $("#disconnect");
var echo = $("#echo");
if (connected) {
connect.addClass("layui-btn-disabled");
disconnect.removeClass("layui-btn-disabled");
echo.removeClass("layui-btn-disabled");
} else {
connect.removeClass("layui-btn-disabled");
disconnect.addClass("layui-btn-disabled");
echo.addClass("layui-btn-disabled");
}
connect.attr("disabled", connected);
disconnect.attr("disabled", !connected);
echo.attr("disabled", !connected);
}
/ / the connection
function connect() {
var target = $("#target").val();
var ws = new SockJS(target);
stompClient = Stomp.over(ws);
stompClient.connect({}, function () {
setConnected(true);
log('Info: STOMP connection opened.');
// After a successful connection, pull unread messages
pullUnreadMessage("/topic/reply");
// Subscribe to the server's /topic/reply address
stompClient.subscribe("/user/topic/reply".function (response) {
log(JSON.parse(response.body).content); })},function () {
// Disconnect processing
setConnected(false);
log('Info: STOMP connection closed.');
});
}
// Disconnect the connection
function disconnect() {
if(stompClient ! =null) {
stompClient.disconnect();
stompClient = null;
}
setConnected(false);
log('Info: STOMP connection closed.');
}
// Send a message to the specified user
function sendMessage() {
if(stompClient ! =null) {
var receiver = $("#receiver").val();
var msg = $("#message").val();
log('Sent: ' + JSON.stringify({'receiver': receiver, 'msg':msg}));
$.ajax({
url: "/wsTemplate/sendToUser".type: "POST".dataType: "json".async: true.data: {
"receiver": receiver,
"msg": msg
},
success: function (data) {}}); }else {
layer.msg('STOMP connection not established, please connect.', {
offset: 'auto'
,icon: 2}); }}// Pull unread messages from the server
function pullUnreadMessage(destination) {
$.ajax({
url: "/wsTemplate/pullUnreadMessage".type: "POST".dataType: "json".async: true.data: {
"destination": destination
},
success: function (data) {
if(data.result ! =null) {
$.each(data.result, function (i, item) {
log(JSON.parse(item).content); })}else if(data.code ! =null && data.code == "500") {
layer.msg(data.msg, {
offset: 'auto'
,icon: 2}); }}}); }// Log output
function log(message) {
console.debug(message);
}
</script>
</head>
<body>
<noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websockets rely on Javascript being
enabled. Please enable
Javascript and reload this page!</h2></noscript>
<div>
<div id="connect-container" class="layui-elem-field">
<legend>Chat With STOMP Message</legend>
<div>
<input id="target" type="text" class="layui-input" size="40" style="width: 350px" value="/chat-websocket"/>
</div>
<div>
<button id="connect" class="layui-btn layui-btn-normal" onclick="connect();">Connect</button>
<button id="disconnect" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled"
onclick="disconnect();">Disconnect
</button>
</div>
<div class="message">
<input id="receiver" type="text" class="layui-input" size="40" style="width: 350px" placeholder="Name of recipient" value=""/>
<input id="message" type="text" class="layui-input" size="40" style="width: 350px" placeholder="Message content" value=""/>
</div>
<div>
<button id="echo" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled"
onclick="sendMessage();">Send Message
</button>
</div>
</div>
</div>
</body>
</html>
Copy the code
Test effect omitted, the specific effect can be run in two different servers above the example source code to view.