Introduction of WebSocket

The WebSocket protocol is a completely redesigned protocol designed to provide a practical solution to the problem of two-way data transmission on the Web, allowing messages to be transmitted between the client and the server at any time

Netty’s support for WebSocket covers all the major implementations in use, and we’ll demonstrate this by creating a WebSocket-based live chat application

Example WebSocket application

We will implement a browser-based chat application using the WebSocket protocol, allowing multiple users to communicate with each other simultaneously

The following diagram illustrates the logic of the application:

  1. The client sends a message
  2. The message will be broadcast to all other connected clients

Everyone can chat with each other. In our example, we just implement the server, and the client is a browser that accesses the chat room through a Web page

1. Add WebSocket support

When switching from the standard HTTP or HTTPS protocol to WebSocket, a mechanism called upgrade handshake is used. The application protocol using WebSocket will always start with HTTP/S and then perform the upgrade. The exact moment this upgrade action occurs is application-specific, either at startup or after a particular URL has been requested

Our application will follow the convention: If the REQUESTED URL ends in /ws, upgrade the protocol to WebSocket, otherwise the server will use basic HTTP/S

The following diagram illustrates how Netty handles HTTP and WebSocket protocol technology, which is implemented by a set of ChannelHandlers

2. Process HTTP requests

First, we implement a component that handles the HTTP request, which will provide a Web page for accessing the chat room and displaying messages sent by the connected client

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    private final String wsUri;
    private static final File INDEX;

    static {
        URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
        try {
            String path = location.toURI() + "index.html"; path = ! path.contains("file")? path : path.substring(5);
            INDEX = new File(path);
        } catch (URISyntaxException e) {
            throw new IllegalStateException("Unable to locate index.html", e); }}public HttpRequestHandler(String wsUri) {
        this.wsUri = wsUri;
    }

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        if (wsUri.equalsIgnoreCase(request.uri())) {
            // If a WebSocket protocol upgrade is requested, the reference count is increased and passed to the next ChannelInboundHandler
            ctx.fireChannelRead(request.retain());
        } else {
            / / read index. HTML
            RandomAccessFile file = new RandomAccessFile(INDEX, "r");
            DefaultHttpResponse response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
            response.headers().set("CONTENT_TYPE"."text/html; charset=UTF-8");
            // Write HttpResponse to the client
            ctx.write(response);
            // Write index.html to the client
            if (ctx.pipeline().get(SslHandler.class) == null) {
                ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
            } else {
                ctx.write(new ChunkedNioFile(file.getChannel()));
            }
            // Write LastHttpContent and flush to the client
            ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
            // Close the Channel after the write operationfuture.addListener(ChannelFutureListener.CLOSE); }}}Copy the code

3. Process WebSocket frames

The WebSocket RFC published by the IETF defines six types of frames, and Netty provides a POJO implementation for each of them. The following table lists these frame types and describes their usage

The frame type describe
BinaryWebSocketFrame Contains binary data
TextWebSocketFrame Contains text data
ContinuationWebSocketFrame Contains text or binary data belonging to the previous BinaryWebSocketFrame or TextWebSocketFrame
CloseWebSocketFrame Represents a CLOSE request, including a closed status code and the reason for closing
PingWebSocketFrame Transfer a PongWebSocketFrame
PongWebSocketFrame Is sent as a response to PingWebSocketFrame

The following code shows the ChannelInboundHandler for handling TextWebSocketFrame, which will also track all active WebSocket connections in its ChannelGroup

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private final ChannelGroup group;

    public TextWebSocketFrameHandler(ChannelGroup group) {
        this.group = group;
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
            // If the event handshake succeeds, remove HttpRequestHandler because no more HTTP messages will be received
            ctx.pipeline().remove(HttpRequestHandler.class);
            // Notify all connected WebSocket clients that the new client has been connected
            group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));
            // Add the new WebSocket Channel to the ChannelGroup
            group.add(ctx.channel());
        } else {
            super.userEventTriggered(ctx, evt); }}@Override
    protected void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        // Increase the reference count of the message and write it to all connected clients in ChannelGroupgroup.writeAndFlush(msg.retain()); }}Copy the code

4. Initialize ChannelPipeline

To install the ChannelHandler into the ChannelPipeline, we need to extend the ChannelInitializer and implement the initChannel() method

public class ChatServerInitializer extends ChannelInitializer<Channel> {

    private final ChannelGroup group;

    public ChatServerInitializer(ChannelGroup group) {
        this.group = group;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(64 * 1024));
        pipeline.addLast(new HttpRequestHandler("/ws"));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(newTextWebSocketFrameHandler(group)); }}Copy the code

For calls to the initChannel() method, set the Channel pipeline for the newly registered Channel by installing all the necessary ChannelHandlers. Netty WebSocketServerProtocolHandler processing all the WebSocket entrusted management frame type and upgrade handshake itself. If the handshake itself, the channelhandlers needed are added to the ChannelPipeline, and those no longer needed are removed

5. Guide

The final step is to boot the server and install the ChatServerInitializer code, which will be handled by the ChatServer class

public class ChatServer {

    private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
    private final EventLoopGroup group = new NioEventLoopGroup();
    private Channel channel;

    public ChannelFuture start(InetSocketAddress address) {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .childHandler(createInitializer(channelGroup));
        ChannelFuture future = bootstrap.bind(address);
        future.syncUninterruptibly();
        channel = future.channel();
        return future;
    }

    protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
        return new ChatServerInitializer(group);
    }

    public void destroy(a) {
        if(channel ! =null) {
            channel.close();
        }
        channelGroup.close();
        group.shutdownGracefully();
    }

    public static void main(String[] args) {
        if(args.length ! =1) {
            System.err.println("Please give port as argument");
            System.exit(1);
        }
        int port = Integer.parseInt(args[0]);
        final ChatServer endpoint = new ChatServer();
        ChannelFuture future = endpoint.start(new InetSocketAddress(port));
        Runtime.getRuntime().addShutdownHook(newThread(endpoint::destroy)); future.channel().closeFuture().syncUninterruptibly(); }}Copy the code