Netty implements the Reactor model. The following two figures illustrate the process of Netty

Netty encoding mode:

public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new MyServerInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(8899).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}Copy the code

Follow the source code using EventLoopGroup as an entry point.

An EventLoopGroup is defined as follows:

//Return the next EventLoop to use  
EventLoop next();  
//Register a Channel with this EventLoop. The returned ChannelFuture will get notified once the registration was complete.  
ChannelFuture register(Channel channel);   
Copy the code

NioEventLoopGroup class initialization: the default call is:

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); }}Copy the code

The default is 2* core CPU threads (nThreads). The final initialization method called:

Create a new instance. Params: NThreads -- The number of threads that will be used by this instance. Executor -- The executor to use Or null if the default should be 2. ChooserFactory - the EventExecutorChooserFactory to use. The args - the arguments which will passed to each newChild(Executor, Object...) call protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (! success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (! e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length)  { terminationFuture.setSuccess(null); }}}; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }Copy the code

The newChild method in our code is the newChild method of the NioEventLoopGroup when we start the nThreads EventExecutor.

ServerBootstrap is a bootstrap helper class that assigns values to member variables of the ServerBootstrap class, which is used when calling bind.

serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new MyServerInitializer());

The specified NioServerSocketChannel. Class through reflection to generate instances of the class:

@Override public T newChannel() { try { return clazz.getConstructor().newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); }}Copy the code

ByteBuf->ByteBUffer ->Future ->Future The corresponding listener receives the notification immediately. The sync method for the Future –> wait for the Future to complete. Waits for this future until it is done, and rethrows the cause of the failure if this future failed.