background
At the front end, the company developed an active service robot program, which means that the message generated is pushed to the client (H5, IOS, Android) through the server. It supports the user’s personalized switch Settings, and the user can freely choose the type of message to accept; At the same time, users are supported to ask questions; Document the general idea of the deployment and implementation here;
Meanwhile, I would like to thank my Leader for his help.
The deployment of
Nginx configuration
- To keep the long connection valid, configure HTTP version 1.1.
- configuration
Upgrade
andConnection
Response header information;
The complete configuration is as follows:
location / {
proxy_pass http://nodes;
# enable WebSockets
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
Copy the code
The Socket configuration
The Socket configuration class
public class WebSocketConfig {
private Logger log = LoggerFactory.getLogger(WebSocketConfig.class);
@Value("${wss.server.host}")
private String host;
@Value("${wss.server.port}")
private Integer port;
@Value("${redis.passwd}")
private String redisPasswd;
@Value("${redis.address}")
private String redisAddress;
@Bean
public PubSubStore pubSubStore(a) {
return socketIOServer().getConfiguration().getStoreFactory().pubSubStore();
}
@Bean
public SocketIOServer socketIOServer(a) {
Config redissonConfig = new Config();
// Higher versions require redis:// prefix
redissonConfig.useSingleServer().setPassword("xxx").setAddress("redis://xxx:xx").setDatabase();
RedissonClient redisson = Redisson.create(redissonConfig);
RedissonStoreFactory redisStoreFactory = new RedissonStoreFactory(redisson);
Configuration config = new Configuration();
config.setHostname(host);
config.setPort(port);
config.setOrigin(origin);
config.setHttpCompression(false);
config.setWebsocketCompression(false);
config.setStoreFactory(redisStoreFactory);
// Note that if you open cross-domain Settings, you need to set it to null instead of "*".
config.setOrigin(null);
// Timeout period for protocol upgrade (ms). The default value is 10000. Update HTTP handshake to WS timeout
config.setUpgradeTimeout(10000);
// Ping message interval (ms), default 25000. The interval between a client sending a heartbeat message to the server
config.setPingInterval(25000);
Ping message timeout (ms). The default value is 60000. If no heartbeat message is received within this interval, a timeout event will be sent
config.setPingTimeout(60000);
/** all methods */ must be overridden
config.setExceptionListener(new ExceptionListener(){
@Override
public void onConnectException(Exception e, SocketIOClient client) {
ResponseMessage error = ResponseMessage.error(-1."Abnormal connection!");
client.sendEvent("exception", JSON.toJSON(new Response<String>(error, "Abnormal connection!")));
}
@Override
public void onDisconnectException(Exception e, SocketIOClient client) {
ResponseMessage error = ResponseMessage.error(-1."Disconnect exception!");
client.sendEvent("exception",JSON.toJSON(new Response<String>(error, "Abnormal connection!")));
}
@Override
public void onEventException(Exception e, List<Object> data, SocketIOClient client) {
ResponseMessage error = ResponseMessage.error(-1."Server exception!");
client.sendEvent("exception",JSON.toJSON(new Response<String>(error, "Abnormal connection!")));
}
@Override
public void onPingException(Exception e, SocketIOClient client) {
ResponseMessage error = ResponseMessage.error(-1."PING timeout exception!");
client.sendEvent("exception",JSON.toJSON(new Response<String>(error, "PING timeout exception!")));
}
@Override
public boolean exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
return false; }});// Similar to filter Settings, not handled here
config.setAuthorizationListener(data -> {
// // You can use the following code to obtain user password information
// String appId = data.getSingleUrlParam("appId");
// String source = data.getSingleUrlParam("source");
// log.info("token {}, client {}", appId, source);
return true;
});
return new SocketIOServer(config);
}
@Bean
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
return newSpringAnnotationScanner(socketServer); }}Copy the code
Socket to start the class
@Log4j2
@Component
@Order(value=1)
public class ServerRunner implements CommandLineRunner {
private final SocketIOServer server;
@Autowired
public ServerRunner(SocketIOServer server) {
this.server = server;
}
@Override
public void run(String... args) throws Exception {
server.start();
log.info("Socket. IO started successfully!); }}Copy the code
The final architecture
The implementation process
As a KafKa consumer, the data producer pushes the processed data to KafKa, and the consumer monitors the message and broadcasts it to the client. When pushing, the user’s personalized Settings are queried in the database and only the messages that the client chooses to accept are pushed.
Since the active push service is deployed with multiple nodes, which are assigned to the same KafKa consumption group, this can cause the problem that multiple nodes consume only part of the total message. Here, Redis’ publish/subscribe mechanism is used to solve this problem: after each node consumes the message, after the message is published, other nodes subscribe to the Topic and send the message to the clients connected on their nodes, where each node is both the publisher and the subscriber.
From data generation to consumption
Use Redisson’s Topic for distributed publish/subscribe
To facilitate the use of the publish/subscribe mechanism in Redis, Redisson packages it as a Topic and provides code-level publish/subscribe operations so that multiple JVM processes connected to Redis (single machine/cluster) can implement a Topic published in a single JVM process. Messages are received in time in other JVM processes that have subscribed to the topic.
After Netty-Socketio integrated Redisson, a publish/subscribe mechanism was also used internally
Release of information
public void sendMessageToAllClient(String eventType, String message, String desc) {
Collection<SocketIOClient> clients = server.getBroadcastOperations().getClients();
for(final SocketIOClient client : clients){
// Do Somthing
}
Packet packet = new Packet(PacketType.MESSAGE);
packet.setData(new BroadcastMessage(message, eventType, desc));
publishMessage(packet);
}
private void publishMessage(Packet packet) {
DispatchMessage dispatchMessage = new DispatchMessage("", packet, "");
pubSubStore.publish(PubSubType.DISPATCH, dispatchMessage);
BroadcastMessage broadcastMessage = dispatchMessage.getPacket().getData();
}
Copy the code
Subscription of messages
@PostConstruct
public void init(a) {
pubSubStore.subscribe(PubSubType.DISPATCH, dispatchMessage -> {
BroadcastMessage messageData = dispatchMessage.getPacket().getData();
Collection<SocketIOClient> clients = server.getBroadcastOperations().getClients();
for(final SocketIOClient client : clients){
// DO Somthing
}, DispatchMessage.class);
}
Copy the code