Offer to come, dig friends take it! I am participating in the 2022 Spring Recruit Punch card activity. Click here for details
1. The introduction
You can create an EventLoopGroup by creating an executor that can be implemented using the Jdk. This is a thread pool. There is a constructor like this:
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
Copy the code
- NThreads: Indicates the number of EventLoopGroup threads
- Executor: Indicates a user-defined executor
Executor We use the thread pool implemented by the JDK as the executor.
Question: What happens when the value of nThreads is greater than the maximum number of threads in the thread pool?
Here’s a code look at what happens with this problem and how to fix it.
2. Case code
Server code
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run(a) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1); / / (1)
EventLoopGroup workerGroup = new NioEventLoopGroup(3, Executors.newFixedThreadPool(2.new ThreadFactory() {
private AtomicInteger threadNumber = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Thread-mxsm-"+threadNumber.incrementAndGet()); }}));try {
ServerBootstrap b = new ServerBootstrap(); / / (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) / / (3)
.childHandler(new ChannelInitializer<SocketChannel>() { / / (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerInHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) / / (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); / / (6)
ChannelFuture f = b.bind(port).sync(); / / (7)
f.channel().closeFuture().sync();
} finally{ workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }}public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
newDiscardServer(port).run(); }}@Sharable
public class TimeServerInHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(this.getClass().getSimpleName()+"--channelRead");
final ByteBuf time = ctx.alloc().buffer(4); / / (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
ctx.writeAndFlush(time); / / (3)}}Copy the code
Client code:
public class TimeClient {
public static void main(String[] args) throws Exception {
for(int i = 0; i < 100; ++i){
new Thread(new Runnable(){
@Override
public void run(a) {
try {
new TimeClient().test();
} catch(Exception e) { e.printStackTrace(); } } }).start(); }}public void test(a) throws Exception{
String host = "127.0.0.1";
int port = Integer.parseInt("8080");
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); / / (1)
b.group(workerGroup); / / (2)
b.channel(NioSocketChannel.class); / / (3)
b.option(ChannelOption.SO_KEEPALIVE, true); / / (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// ch.pipeline().addLast(new TimeClientOutHandler());
ch.pipeline().addLast(newTimeClientInHandler()); }});// Start the client.
ChannelFuture f = b.connect(host, port).sync(); / / (5)
ByteBuf byteBuf = Unpooled.buffer();
// Wait until the connection is closed.
byteBuf.writeBytes("1111".getBytes(StandardCharsets.UTF_8));
f.channel().writeAndFlush(byteBuf);
f.channel().closeFuture().sync();
} finally{ workerGroup.shutdownGracefully(); }}}public class TimeClientInHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(this.getClass().getSimpleName()+"--channelRead");
ByteBuf m = (ByteBuf) msg; / / (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
//ctx.close();
} finally{ m.release(); }}}Copy the code
Run the server code and the client code separately, then run the command:
jps
Copy the code
Then find the PID on the server side and run the command
jstack -l <pid>
Copy the code
Running results:
3. Result analysis
You can see from the above thread that only two threads are created in the workerGroup, meaning that the number of threads in this case is determined by the thread pool. Let’s analyze why this is the case from the source code:
Create NioEventLoopGroup constructor calls the MultithreadEventExecutorGroup constructor.
If the Executor position 1 is empty, use Netty’s custom:
Netty’s Executor, as shown above, creates a thread each time it executes a submitted task and then binds the thread to an EventLoop. If the user passes in the Executor is not empty then the incoming Executor is used.
The difference between an Executor using a custom and Netty custom implementation is shown below:
Summary: When using a custom Executor, it is best to keep the number of threads the same as the number of nThreads in the NioEventLoopGroup constructor. If the number is different, it will reduce the throughput of the workGroup
The number of Threads in the NioEventLoopGroup constructor refers to the number of Eventloops, but there is no strict requirement in Netty that one Thread must correspond to one EventLoop
4. To summarize
- Netty Create NioEventLoopGroup You are advised to use Netty’s default Executor implementation. This avoids the above problems
- When working with Channel data, you can use the JDK thread pool as a business thread pool for business processing
- Use Netty directly, or set it to the same.
I am ant back elephant, the article is helpful to you like to pay attention to me, the article has incorrect place please give correct comments ~ thank you