Netty learning – WebSocket long connection and communication between sockets
Recently is learning Netty, followed the tutorial to write a web chat room based on WebSocket, have a certain understanding of Netty, now just use the project to long connection, choose Netty.
Project objective: Client A(web page) communicates with server through WebSocket, client B communicates with server through Socket, and transmits the data of client B to client A. The bridge is server
The Socket server listens on port 8090. The long-connection server listens on port 8089. Client A connects to port 8089, and client B connects to port 8090
Since data on the two ports need to be processed differently, we create two ServerBootstrap servers and bind the two ports respectively. One ServerGzhBootstrap handles socket communication between client B and server. ServerWxQBootstrap Processes WebSocket long-connection communication between client A and the server
ServerInitializer
To realizeChannelInitializer
The Handler that initializes the communication between client B and the serverWebSocketChannelInitializer
To realizeChannelInitializer
Is responsible for initializing the Handler for the long connection communication between client A and the serverServerInitializer
Add a customSimpleChannelInboundHandler
Handle the communication between client B and server socketWebSocketChannelInitializer
Add a customSimpleChannelInboundHandler
Responsible for handling the long-connection communication between client A and server WebSocket
Web chat room as client A, client B through Socket communication and receive console input as communication data transmission to the server, the server to client A
Question:
In netty SimpleChannelInboundHandler class specifies the incoming message in A generic type, can only receive this type of message, the client sends A String type B message is different from the client receives A TextWebSocketFrame type, the client is unable to receive A.
Solutions:
We receive the String message sent by client B on the Socket server and send it to client A(we need to encapsulate it as TextWebSocketFrame to send it to client A), and we need to have client A’s channel. We can then call writeAndFlush to write data to client A
What can be used to get client A’s channel?
We define a class that holds all Channel clients as a global ChannelGroup. Every time a client Channel is created (handlerAdded method), we store it in the global ChannelGroup. Every time a channel is used, the ChannelGroup will automatically delete the useless channels for us, so that we can get all the client channels
Any channels that get client A?
A big difference between client A and client B is the port number. We can determine client A or client B by the port number
- Global ChannelGroup
public class GlobalChannelGroup {
public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}Copy the code
The service side
- Server initiator
Public class Server {public static void main(String[] args) throws InterruptedException {// Two event loop group bosses obtain connections and send them EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap serverWxQBootstrap = new ServerBootstrap(); ServerBootstrap serverGzhBootstrap = new ServerBootstrap(); // Define group // channel(reflection) // Define handler (custom) : init system.out.println (" start server") after connecting to channel; serverGzhBootstrap.group(boss, worker).channel(NioServerSocketChannel.class) .childHandler(new ServerInitializer()); serverWxQBootstrap.group(boss, worker).channel(NioServerSocketChannel.class) .childHandler(new WebSocketChannelInitializer()); ChannelFuture WXQ = servergZhBootstrap.bind (8090).sync(); ChannelFuture gzh = serverWxQBootstrap.bind(8089).sync(); gzh.channel().closeFuture().sync(); wxq.channel().closeFuture().sync(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); }}}Copy the code
- Server Socket port initializer
public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); Pipeline. AddLast (new LengthFieldBasedFrameDecoder (Integer. MAX_VALUE, 0,4,0,4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // To decode pipeline.addlast (new StringEncoder(charsetutil.utf_8)); // For encoding pipeline.addlast (new ServerHandler()); // Custom handler}}Copy the code
- Server Socket Port communication processor
public class ServerHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(ctx.channel().remoteAddress()+","+msg); Ctx.channel ().writeAndFlush(" The message has entered the database and is on its way to wechat wall!" ); GlobalChannelGroup. ChannelGroup. ForEach (o - > {/ / if the port ends in 8089, A if (O.Locator Address().toString().endsWith("8089")){TextWebSocketFrame Text = new TextWebSocketFrame(o.emoteAddress () + "Send message:" + MSG + "\n"); o.writeAndFlush(text); }}); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress()+": connect to wechat wall mode successfully!" ); int size = GlobalChannelGroup.channelGroup.size(); System.out.println(" 内 容 提 要 :"+(size==0? 0:size-1)); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); GlobalChannelGroup.channelGroup.add(channel); }}Copy the code
- Server long connect communication port initializer
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); // To aggregate HTTP data together, send a request fullHttpRequest pipeline.addLast(new HttpObjectAggregator(8192)) pipeline.addLast(new WebSocketServerProtocolHandler("/")); // Pass the webSocket path pipeline.addLast(new TextWebSocketHandler()); // Pass websocket path}}Copy the code
- Server long connection communication processor
public class TextWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { int size = GlobalChannelGroup.channelGroup.size(); System.out.println(" 内 容 提 要 :"+(size==0? 0:size-1)); System.out.println(" received message: "+msg.text()); Channel channel = ctx.channel(); GlobalChannelGroup.channelGroup.forEach(o->{ if (o.localAddress().toString().endsWith("8090")){ o.writeAndFlush(msg.text()); }else {TextWebSocketFrame text = new TextWebSocketFrame(o.emoteAddress () + "send message:" + msg.text() + "\n"); o.writeAndFlush(text); }}); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel ch = ctx.channel(); GlobalChannelGroup.channelGroup.add(ch); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress()+": leave chat room "); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel ch = ctx.channel(); System.out.println(ch.remoteAddress()+" : connect to chat room "); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throws Exception {system.out.println (" Exception "); Throwable cause) throws Exception {system.out.println (" Exception "); ctx.close(); }}Copy the code
The client
The client passes in the console’s standard input as a parameter and sends it when creating a client channel
- Initiator of client B
public class GzhClient { public static void main(String[] args) { EventLoopGroup eventExecutors = null; ChannelFuture channelFuture = null; try{ // while (true) { eventExecutors = new NioEventLoopGroup(); Scanner scanner = new Scanner(System.in); String json = scanner.nextLine(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventExecutors).channel(NioSocketChannel.class) .handler(new GzhClientInitializer(json)); System.out.println(" start client "); channelFuture = bootstrap.connect("localhost", 8090).sync(); // } // channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { eventExecutors.shutdownGracefully(); }}}Copy the code
- Client B initializer
public class GzhClientInitializer extends ChannelInitializer<SocketChannel> { private String json; public GzhClientInitializer(String json){ this.json = json; } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); Pipeline. AddLast (new LengthFieldBasedFrameDecoder (Integer. MAX_VALUE, 0,4,0,4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new GzhClientHandler(json)); }}Copy the code
- Client B communicates with the server Socket on the processor
public class GzhClientHandler extends SimpleChannelInboundHandler<String> { private String json; public GzhClientHandler(String json){ this.json = json; } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("client receive:"+msg); } @override public void channelActive(ChannelHandlerContext CTX) throws Exception {system.out.println (" Enter the channel wall mode and send the message :"); ctx.writeAndFlush(json); }}Copy the code
test
- Start the server and create two client B’s (also known as GzhClient)
- Open a chat room and establish a long connection
- Both clients B send messages from the console
The server receives the message, prints it out and is ready to forward it to client A(that is, the web chat room)
The chat room receives the message from the server