I have done an IM project before, which involves the basic chat function, so pay attention to this series of articles is not practice, do not include the basic and step by step learning part, directly start the actual combat and thought guidance, the basic part needs extra supplement, I can follow up a series of articles if I have energy.

Why is the first article about chat rooms? Chat rooms are the easiest part to implement. IM structure is the most simple part, followed by single chat and group chat, business logic layer upon layer of incremental, thoroughly take down the chat room code, advanced single chat and group chat is very simple, I will launch the follow-up live broadcast room implementation.

If you simply want to achieve the chat room is very simple, but I will try to go through the whole process, in order to facilitate understanding.

It is mainly implemented by two functional classes: initialization class + response handling class

0. Preparation

Add the pom. XML

<dependency> <groupId> io.ty </groupId> <artifactId>netty-all</artifactId> <version>4.1.2.Final</version> </dependency>Copy the code

1. Auxiliary interface implementation

Secondary interfaces make the service architecture clearer. There are two interfaces, one for Http requests and one for Webocket requests.

MyHttpService.java

Public interface MyHttpService {void handleHttpRequest(ChannelHandlerContext CTX, FullHttpRequest request); }Copy the code

MyWebSocketService.java

Public interface MyWebSocketService {void handleFrame(ChannelHandlerContext CTX, WebSocketFrame frame); }Copy the code

So the question is, who is going to implement these two classes, who is going to handle the distribution of these two requests.

See below service response processing categories: WebSocketServerHandler. Java

2. Request handling classes

Inherited SimpleChannelInboundHandler class, realize channelRead0 handlerAdded () () handlerRemoved exceptionCaught () () method, is the first choice method, Other methods allow us to do some tagging and follow-up.

WebSocketServerHandler.java

@Slf4j
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

    private MyHttpService httpService;
    private MyWebSocketService webSocketService;

    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    public WebSocketServerHandler(MyHttpService httpService, MyWebSocketService webSocketService) {
        super();
        this.httpService = httpService;
        this.webSocketService = webSocketService;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            httpService.handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            webSocketService.handleFrame(ctx, (WebSocketFrame) msg);
        }
    }


    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        channels.add(ctx.channel());
        channels.writeAndFlush(new TextWebSocketFrame(ctx.channel() +"Online"));
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        channels.remove(ctx.channel());
        channels.writeAndFlush(new TextWebSocketFrame(ctx.channel() +"Offline")); } @override public void exceptionCaught(ChannelHandlerContext CTX, Throwable cause) throws Exception { channels.remove(ctx.channel()); ctx.close(); log.info("Exception message: {}",cause.getMessage()); }}Copy the code
  1. Create a ChannelGroup to store each channel that has been connectedhandlerAdded()In the methodchannels.add(ctx.channel());, corresponding tohandlerRemovedIn the methodremove.
  2. inchannelRead0()In the method, the recognition and processing of the request are realized.
  3. exceptionCaught()The method is to handle when an exception occurs.

3. Initialize the class implementation

@Slf4j public class WebSocketServer implements MyHttpService, MyWebSocketService {/ * * * handshake with variable * / private static final AttributeKey < WebSocketServerHandshaker > ATTR_HAND_SHAKER = AttributeKey.newInstance("ATTR_KEY_CHANNEL_ID"); private static final int MAX_CONTENT_LENGTH = 65536; /** * Request type constant */ private static final String WEBSOCKET_UPGRADE ="websocket";
    private static final String WEBSOCKET_CONNECTION = "Upgrade";
    private static final String WEBSOCKET_URI_ROOT_PATTERN = "ws://%s:%d"; /** * private String host; private int port; Private Map<ChannelId, Channel> channelMap = new HashMap<>(); private final String WEBSOCKET_URI_ROOT; public WebSocketServer(String host, int port) { this.host = host; this.port = port; WEBSOCKET_URI_ROOT = string. format(WEBSOCKET_URI_ROOT_PATTERN, host, port); } public voidstartEventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pl = channel.pipeline(); Channelmap.put (channel.id(),channel); log.info("new channel {}",channel);
                channel.closeFuture().addListener((ChannelFutureListener) channelFuture -> {
                    log.info("channel close future {}",channelFuture); Remove (channelfuture.channel ().id())) from channelmap.remove (channelfuture.channel ().id())); }); // Add HTTP codec pl.addLast(new HttpServerCodec()); // aggregator pl.addLast(new HttpObjectAggregator(MAX_CONTENT_LENGTH)); Pl.addlast (new ChunkedWriteHandler()); Pl.addlast (new WebSocketServerHandler(webSocketServer. this, webSocketServer.this)); }}); /** * after instantiation, */ try {ChannelFuture ChannelFuture = bootstrap.bind(host,port).addListener((ChannelFutureListener)) channelFuture1 -> {if (channelFuture1.isSuccess()){
                    log.info("webSocket started");
                }
            }).sync();
            channelFuture.channel().closeFuture().addListener((ChannelFutureListener) channelFuture12 ->
                    log.info("server channel {} closed.", channelFuture12.channel())).sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println("Failed to bind port");
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
        log.info("webSocket shutdown"); } @override public void handleHttpRequest(ChannelHandlerContext CTX, FullHttpRequest Request) {// Check whether it is a socket requestif(isWebSocketUpgrade(request)){// If webSocket requests log.info("Request is webSocket protocol"); String subProtocols = request.headers(). Get (httpHeadernames.sec_websocket_protocol); / / handshake factory set uri + + agreement does not allow extension WebSocketServerHandshakerFactory handshakerFactory = new WebSocketServerHandshakerFactory(WEBSOCKET_URI_ROOT,subProtocols,false); / / instantiate a handshake request from factory WebSocketServerHandshaker handshaker = handshakerFactory. NewHandshaker (request);if(handshaker = = null) {/ / handshake failure: the agreement does not support WebSocketServerHandshakerFactory. SendUnsupportedVersionResponse (CTX) channel ()); }elseHandshaker.handshake (ctx.channel(),request); Ctx.channel ().attr(ATTR_HAND_SHAKER).set(handshaker); }return;
        }else{// Do not process HTTP requests log.info("Do not process HTTP requests");
        }
    }

    @Override
    public void handleFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        /**
         * text frame handler
         */
        if (frame instanceof TextWebSocketFrame){
            String text = ((TextWebSocketFrame) frame).text();
            TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(text);
            log.info("Receive textWebSocketFrame from channel: {}, {} are currently online.",ctx.channel(),channelMap.size()); // Send to other channels (group chat function)for (Channel ch: channelMap.values()){
                if (ch.equals(ctx.channel())){
                    continue; } // Write the text frame to ch.writeAndFlush(textWebSocketFrame); log.info("Message sent to {}",ch);
                log.info("write text: {} to channel: {}",textWebSocketFrame,ctx.channel());
            }
            return; } /** * ping frame, reply pong frame */if (frame instanceof PingWebSocketFrame){
            log.info("receive pingWebSocket from channel: {}",ctx.channel());
            ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        /**
         * pong frame, do nothing
         */
        if (frame instanceof PongWebSocketFrame){
            log.info("receive pongWebSocket from channel: {}",ctx.channel());
            return;
        }
        /**
         * close frame, close
         */
        if (frame instanceof CloseWebSocketFrame){
            log.info("receive closeWebSocketFrame from channel: {}", ctx.channel()); / / get to shake hands information WebSocketServerHandshaker handshaker = CTX. Channel (). The attr (ATTR_HAND_SHAKER). The get ();if (handshaker == null){
                log.error("channel: {} has no handShaker", ctx.channel());
                return;
            }
            handshaker.close(ctx.channel(),((CloseWebSocketFrame) frame).retain());
            return; } /** * the rest is binary frame, ignore */ log.warn("receive binary frame , ignore to handle"); } /** * Check whether it is a webSocket request */ private Boolean isWebSocketUpgrade(FullHttpRequest req) {HttpHeaders headers = req.headers();returnreq.method().equals(HttpMethod.GET) && headers.get(HttpHeaderNames.UPGRADE).contains(WEBSOCKET_UPGRADE) && headers.get(HttpHeaderNames.CONNECTION).contains(WEBSOCKET_CONNECTION); }}Copy the code
  1. l.addLast(new WebSocketServerHandler(WebSocketServer.this, WebSocketServer.this));Add your own response handling.WebSocketServerHandlerIs the request handling class implemented in the second point.
  2. private Map<ChannelId, Channel> channelMap = new HashMap<>();To save ChannelId and CHannel. Convenient for later corresponding acquisition.
  3. bootstrap.bind(host,port)You can also replace it with bind ports only.
    public ChannelFuture bind(String inetHost, int inetPort) {
        return bind(new InetSocketAddress(inetHost, inetPort));
    }
Copy the code
    public synchronized InetAddress anyLocalAddress() {
        if(anyLocalAddress == null) { anyLocalAddress = new Inet4Address(); / / {0 x00 to 0 x00 to 0 x00 to 0 x00} anyLocalAddress. Holder (). The hostName ="0.0.0.0";
        }
        return anyLocalAddress;
    }
Copy the code

It opens service to port 0.0.0.0 by default. 4. HandleHttpRequest and handleFrame are an implementation of MyWebSocketService. 5. There are notes for every detail. Read the notes carefully.

4. Start the service

public class Main {
    public static void main(String[] args) {
        new WebSocketServer("192.168.1.33",9999).start(); }}Copy the code
How to test in LAN?

I use a serve service from NPM for LAN. Website: www.npmjs.com/package/ser… React Package considerations and static file service setup

Make sure your phone and computer are on the LAN, so you can access your own group chat.

5. Front-end page

Give me a set if you want.

<! DOCTYPE html> <html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="Width = device - width, initial - scale = 1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>Document</title>
    <style type="text/css">
        .talk_con{
            width:600px;
            height:500px;
            border:1px solid # 666;
            margin:50px auto 0;
            background:#f9f9f9;
        }
        .talk_show{
            width:580px;
            height:420px;
            border:1px solid # 666;
            background:#fff;
            margin:10px auto 0;
            overflow:auto;
        }
        .talk_input{
            width:580px;
            margin:10px auto 0;
        }
        .whotalk{
            width:80px;
            height:30px;
            float:left;
            outline:none;
        }
        .talk_word{
            width:420px;
            height:26px;
            padding:0px;
            float:left;
            margin-left:10px;
            outline:none;
            text-indent:10px;
        }
        .talk_sub{
            width:56px;
            height:30px;
            float:left;
            margin-left:10px;
        }
        .atalk{
            margin:10px;
        }
        .atalk span{
            display:inline-block;
            background:#0181cc;
            border-radius:10px;
            color:#fff;
            padding:5px 10px;
        }
        .btalk{
            margin:10px;
            text-align:right;
        }
        .btalk span{
            display:inline-block;
            background:#ef8201;
            border-radius:10px;
            color:#fff;
            padding:5px 10px;
        }
    </style>
    <script type="text/javascript">
        //
        document.onkeydown = function (ev) {
            if (ev && ev.keyCode == 13){
                send();
                clear();
            }
        }
        var socket;
        if (window.WebSocket) {
            socket = new WebSocket(Ws: / / 192.168.1.33:9999 "");
            // socket = new WebSocket(Ws: / / 127.0.0.1:9999 "");
            // socket = new WebSocket(Ws: / / 192.168.43.186:9999 "");
            socket.onmessage = function (ev) {
                atalkAppendIn("Received:"+socket.channel + ":" + ev.data)
            };
            socket.onopen = function () {
                btalkAppendIn("Connection established");
            }
            socket.onclose = function () {
                btalkAppendIn("Connection closed");
            };
        }else {
            alert("Browser not supported");
        }
        function send(){
            var message = document.getElementById("talkwords");
            if(! window.WebSocket){return
            }
            if (socket.readyState === WebSocket.OPEN){
                socket.send(message.value);
                btalkAppendIn("Send."+ message.value);
                clear();
            } else {
                alert("WebSocket setup failed"); }}function atalkAppendIn(text) {
            var append = document.getElementById("words");
            append.innerHTML+= '<div class="atalk"><span>'+ text +'</span></div>';
        }

        function btalkAppendIn(text) {
            var append = document.getElementById("words");
            append.innerHTML+= '<div class="btalk"><span>'+ text +'</span></div>';
        }
        function clear () {
            var elementById = document.getElementById("talkwords");
            elementById.value = "";
        }

    </script>
</head>
<body>
<div class="talk_con">
    <div class="talk_show" id="words">
    </div>
    <div class="talk_input"> <! --<select class="whotalk" id="who"> -- > <! --<option value="0"</option>--> <! --<option value="1"</option>--> <! --</select>--> <inputtype="text" class="talk_word" id="talkwords">
        <input type="button" onclick="send()" value="Send" class="talk_sub" id="talksub">
    </div>
</div>
</body>
</html>
Copy the code
  1. The socket = new WebSocket (ws: / / 192.168.1.33: "9999");Note that IP and port correspond to the service one by one.
  2. socket.onmessage()Yes Obtain socket information.socket.onopenCreate a connection.socket.oncloseYes Close the connection.socket.send(message.value);Yes Indicates sending socket information.

Console output:

[nioEventLoopGroup 15:12:42. 443-3-6] INFO com.fantj.springbootjpa.net. Ty WebSocketServer - receive textWebSocketFrame from channel: [id: 0x0D08C657, L:/ 192.168.1.33:9999-r :/192.168.1.33:50440], At present a total of two online 15:12:42. 443 [nioEventLoopGroup - 3-6] INFO com.fantj.springbootjpa.net. Ty WebSocketServer - message has been sent to the [id: 0xacd5c1ad, L:/ 192.168.1.33:9999-r :/192.168.1.33:50438] 15:12:42.444 [nioEventLoopgroup-3-5] DEBUG Io.net ty. Handler. Codec. HTTP. Websocketx. WebSocket08FrameEncoder - Encoding WebSocket Frame opCode = 1 length = 5 15:12:42. 443  [nioEventLoopGroup-3-6] INFO com.fantj.springbootjpa.netty.WebSocketServer - write text: TextWebSocketFrame(data: UnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5,cap: 15)) to channel: [id: 0x0d08c657, L:/192.168.1.33:9999 - R:/192.168.1.33:50440]
Copy the code


If you like my article, please pay attention to my official account, which focuses on the analysis of architect technology. The official account is still in the initial stage, thank you for your support.

Java architecture