Hello everyone, I am IT elder brother, today we will use Netty to achieve a group chat system \
The first is the server-side code
public class GroupChatServer {
private int port;
public GroupChatServer(int port) {
this.port = port;
}
/** * Write the run method to handle the client request */
public void run(a) throws Exception{
// Create two thread groups
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
Eight NioEventLoop / /
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// BACKLOG is used to construct a ServerSocket object that identifies when the server request processing thread is full,
// The maximum length of the queue used to temporarily hold requests that have completed the three-way handshake. If it is not set or the value set is less than 1, Java uses the default value 50.
.option(ChannelOption.SO_BACKLOG, 128)
// Whether to enable the heartbeat keepalive mechanism. After a connection is ESTABLISHED between TCP sockets (both enter the ESTABLISHED state)
// This mechanism is activated only when there is no data transfer from the upper layer for about two hours.
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// Get pipeline
ChannelPipeline pipeline = ch.pipeline();
// Add the decoder to the pipeline
pipeline.addLast("decoder".new StringDecoder());
// Add encoder to pipeline
pipeline.addLast("encoder".new StringEncoder());
// Add your own business handler
pipeline.addLast(newGroupChatServerHandler()); }}); System.out.println("Netty Server startup");
ChannelFuture channelFuture = b.bind(port).sync();
// The listener is closed
channelFuture.channel().closeFuture().sync();
}finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}public static void main(String[] args) throws Exception {
new GroupChatServer(7000).run(); }}Copy the code
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
/ * * * GlobalEventExecutor. INSTANCE) is a global event actuator, is a singleton * /
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/** * Adds the current channel to the channelGroup * to indicate that the connection is established. Once the connection is established, the first one is executed@param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// Push the chat information of this client to other online clients
/* This method iterates through all channels in the channelGroup and sends messages. We don't need to iterate */ ourselves
channelGroup.writeAndFlush("[client]" + channel.remoteAddress() + "Join the chat" + sdf.format(new java.util.Date()) + " \n");
channelGroup.add(channel);
}
/** * Disconnect and push xx customer departure information to the current online customer *@param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[client]" + channel.remoteAddress() + "Left \n");
System.out.println("channelGroup size" + channelGroup.size());
}
/** * indicates that the channel is active, indicating that xx is online@param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "Online ~");
}
/** * indicates that the channel is inactive, indicating that xx is offline *@param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "Offline ~");
}
/** * Read data *@param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// Get the current channel
Channel channel = ctx.channel();
// We iterate through channelGroup and send back different messages depending on the situation
channelGroup.forEach(ch -> {
if(channel ! = ch) {// Not the current channel, forward the message
ch.writeAndFlush("[customer]" + channel.remoteAddress() + "Sent a message:" + msg + "\n");
}else {// Echo the message you sent to yourself
ch.writeAndFlush("[himself] sent the message." + msg + "\n"); }}); }@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// Close the channelctx.close(); }}Copy the code
Client code
public class GroupChatClient {
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/ / get the pipeline
ChannelPipeline pipeline = ch.pipeline();
// Add the relevant handler
pipeline.addLast("decoder".new StringDecoder());
pipeline.addLast("encoder".new StringEncoder());
// Add a custom handler
pipeline.addLast(newGroupChatClientHandler()); }}); ChannelFuture channelFuture = bootstrap.connect(host, port).sync();/ / get the channel
Channel channel = channelFuture.channel();
System.out.println("-- -- -- -- -- -- --" + channel.localAddress()+ "-- -- -- -- -- -- -- --");
// The client needs to enter information to create a scanner
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
// Send to the server through channel
channel.writeAndFlush(msg + "\r\n"); }}finally {
group.shutdownGracefully(); }}public static void main(String[] args) throws Exception {
new GroupChatClient("127.0.0.1".7000).run(); }}Copy the code
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg.trim()); }}Copy the code
Give a [look], is the biggest support for IT elder brotherCopy the code