Netty is an asynchronous and event-driven network application framework, which supports fast and easy development of maintainable high-performance servers and clients.
So-called event driven is determined by through all sorts of incident response program process, in the Netty everywhere is full of asynchronous and event driven, this feature allows the application to response at any point in time in any order of events, it brings a very high scalability and let your application can work in the need to deal with the growing, To accommodate this growth in some feasible way or by expanding its processing capacity.
Netty provides high performance and ease of use. It has the following features:
-
Has a well-designed and unified API, support NIO and OIO (blocking IO) and other transport types, support true connectionless UDP Socket.
-
Simple and powerful threading model with highly customizable threads (pools).
-
Good modularity and decoupling, supporting extensible and flexible event models that make it easy to separate concerns for reuse of logical components (pluggable).
-
High performance, higher throughput than the Java core API, with zero-copy functionality to achieve minimal memory copy consumption.
-
Built-in many common protocol codecs, such as HTTP, SSL, WebScoket and other common protocols can be done out of the box through Netty. Users can also use Netty to easily implement their own application-layer protocols.
Most people use Netty primarily to improve application performance, and high performance requires non-blocking IO. Netty’s non-blocking IO is based on and encapsulated in Java NIO. (Using the Java NIO API directly is a tedious and error-prone operation in a highly complex application, and Netty encapsulates these complex operations for you.)
NIO, which can be called New or non-blocking IO, is much more efficient in terms of performance than Java’s old blocking IO. (If you create a separate thread for each IO operation on a connection, then blocking IO doesn’t lag behind NIO in terms of performance, but you can’t create an infinite number of threads. Bad in the case of a very large number of connections).
-
ByteBuffer: NIO data transfers are buffer-based, and ByteBuffer is the buffer abstraction used in NIO data transfers. ByteBuffer supports allocating memory outside the heap and tries to avoid redundant copying during I/O operations. General I/O operations require system calls, it would be to switch to the kernel, the kernel configuration to make the data read from the file to its buffer, only after the data preparation, will write data from the kernel state user mode, the so-called blocking IO that means waiting for data prepared for blocking this period of time. If you want to avoid this extra kernel manipulation, you can use MMAP (virtual memory mapping) to allow users to manipulate files directly.
-
Channel: Similar to a file descriptor, it simply represents an entity (such as a hardware device, file, Socket, or a program component that can perform one or more different I/O operations). You can read data from a Channel into a buffer, and you can write data from a buffer into a Channel.
-
The Selector: Selector is the key to NIO implementation. NIO uses I/O multiplexing to achieve non-blocking. Selector listens for each Channel’s IO events in a thread to determine which channels are ready for I/O operations. Thus, the completion status of any read or write operation can be checked at any time. This approach avoids blocking while waiting for IO operations to prepare data, allows fewer threads to handle many connections, and reduces thread switching and maintenance overhead.
With NIO’s implementation ideas in mind, I felt it was important to take a look at the I/O model in Unix, which has the following five I/O models:
-
Blocking I/O (Blocking I/O)
-
Non-blocking I/O (I/O)
-
I/O multiplexing (select and poll)
-
Signal Driven I/O (SIGIO)
-
Asynchronous I/O (Asynchronous I/O (the POSIX AIo_functions))
The blocking I/O model is the most common I/O model, and InputStream/OutputStream is usually based on the blocking I/O model. In the figure above, we use UDP as an example. The recvfrom() function is the UDP function used to receive data. It uses a system call and blocks until the kernel is ready for the data, after which the kernel buffer copies the data into the user state (recvfrom() receives the data). Blocking is doing nothing while waiting for the kernel to prepare data.
For example, blocking I/O is like going to a restaurant and sitting around waiting for your meal to be ready (non-blocking I/O if you’re on your phone).
In the non-blocking I/O model, the kernel returns an error code, EWOULDBLOCK, if the data is not ready, while recvFROM does not block hibernation in the event of failure, but instead continually asks the kernel if it is ready. In the figure above, the first three times the kernel returns EWOULDBLOCK, Until the fourth query, the kernel data is ready, and then the kernel cache is copied to user mode. This method of constantly asking the kernel to see if a certain state was completed is called polling.
Non-blocking I/O is like ordering takeout, only you’re in a hurry and have to call the delivery guy every once in a while to see if he’s there.
The idea of I/O multiplexing is the same as non-blocking I/O, except that in non-blocking I/O, the kernel is polled in the user state (or a thread) of recvFROM, which consumes a lot of CPU time. I/O multiplexing takes care of polling via select() or poll() system calls to listen for the status of I/O read and write events. As shown in the figure above, when SELECT listens for a Datagram readable, recvFROM sends a system call to copy the kernel data into user mode.
The advantage of this approach is obvious, with I/O multiplexing you can listen to multiple file descriptors and do the monitoring in the kernel. The disadvantage is that at least two system calls (select() and recvfrom()) are required.
I/O multiplexing also works for the example of ordering food, except you can do your own thing while you wait for your food to arrive, either via a food APP or via a phone call from the delivery guy.
Two I/O multiplexing functions, SELECT () and poll(), are provided in Unix. Select () is more compatible, but there is a limit to how many file descriptors it can monitor in a single process. This value is related to FD_SETSIZE, which defaults to 1024 on 32-bit systems and 2048 on 64-bit systems. Another disadvantage of select() is the way it polls. It takes a linear scan polling approach, traversing FD_SETSIZE file descriptors each time, regardless of whether they are active or inactive. Poll () is essentially the same as the implementation of select(), but the data structure is quite different. The user must allocate an array of pollFD structures, which are maintained in the kernel state. Because of this, poll() does not have the same size limit as select(), but its disadvantages are also obvious. A large number of FD arrays are copied between user and kernel states, whether or not this copying makes sense.
An even more efficient implementation than select() and poll() is called epoll(), a scalable I/O multiplexing implementation introduced in Linux kernel 2.6 to replace select() and poll(). Epoll () also has no cap on file descriptors, uses a single file descriptor to manage multiple file descriptors, and uses a red-black tree as a storage structure. In edge-triggered mode, epoll_wait returns only when a new event object is added to epoll for the first time. In horizontal triggering mode, epoll_WAIT is emitted continuously until the event state has changed. That is, edge-triggered mode is notified only once when the file descriptor becomes ready, and horizontal triggered mode is notified continuously until the file descriptor is processed.
For details about epoll_wait, see the following epoll API.
// Create an epoll object and return its file descriptor.
The flags argument allows you to modify the behavior of epoll, which has only one valid value, EPOLL_CLOEXEC.
int epoll_create1(int flags);
// Configures the object that describes which file descriptors and which events are monitored.
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// Wait for any events registered with epoll_ctl until the event occurs once or times out.
// Return the events that occurred in events, up to maxEvents.
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
Copy the code
Another advantage of epoll is that it uses an event-driven approach rather than polling. File descriptors registered in epoll_ctl are activated by a callback mechanism when an event is triggered and epoll_wait is notified. The efficiency is not proportional to the number of file descriptors. Epoll also uses MMAP to reduce the data transfer overhead between kernel and user mode.
In Java NIO2 JDK1.7 (start), as long as the Linux kernel version in 2.6 above, will adopt epoll, as shown in the following source code (DefaultSelectorProvider. Java).
public static SelectorProvider create(a) {
String osname = AccessController.doPrivileged(
new GetPropertyAction("os.name"));
if ("SunOS".equals(osname)) {
return new sun.nio.ch.DevPollSelectorProvider();
}
Use EPollSelectorProvider for Linux kernels >= 2.6
if ("Linux".equals(osname)) {
String osversion = AccessController.doPrivileged(
new GetPropertyAction("os.version"));
String[] vers = osversion.split("\ \.".0);
if (vers.length >= 2) {
try {
int major = Integer.parseInt(vers[0]);
int minor = Integer.parseInt(vers[1]);
if (major > 2 || (major == 2 && minor >= 6)) {
return newsun.nio.ch.EPollSelectorProvider(); }}catch (NumberFormatException x) {
// format not recognized}}}return new sun.nio.ch.PollSelectorProvider();
}
Copy the code
The signal-driven I/O model uses signals to inform the kernel when data is ready. We first turn on a signal-driven I/O socket and use the SIGAction system call to install the signal handler, which the kernel returns directly without blocking the user mode. When the Datagram is ready, the kernel sends a SIGIO signal and recvFROM sends a system call to begin the I/O operation.
The advantage of this model is that the main process (thread) is not blocked, and when the data is ready, the main process (thread) is notified by the signal handler that it is ready for I/O operations and processing of the data.
The various I/O models we discussed earlier, whether blocking or non-blocking, refer to the data preparation phase. The asynchronous I/O model also relies on signal handlers to notify, but unlike the above I/O models, the asynchronous I/O model notifies that the I/O operation has completed, not that the data is ready to complete.
The asynchronous I/O model is truly non-blocking, with the main process doing its own thing and then calling a callback to do some processing of the data when the I/O is done.
Having rambled on so much, I/O models must already have a deep understanding. After that, we’ll take a look at some of the core components of Netty and how to use Netty, and you’ll see how easy it is to implement a Netty application (with all the high performance and maintainability that comes with it).
This article was written by SylvanasSun([email protected]) and appeared on SylvanasSun’s Blog. The original link: https://sylvanassun.github.io/2017/11/30/2017-11-30-netty_introduction/ (reprint please be sure to keep this section of the statement, and retain hyperlinks.)
ByteBuf
The basic unit of network transmission is byte. Java NIO provides ByteBuffer as a ByteBuffer container, but the API of this class is not very convenient to use, so Netty implements ByteBuf as its substitute. Here are the advantages of using ByteBuf:
-
Much simpler to use than ByteBuffer.
-
Transparent zero-copy is implemented with built-in compound buffer types.
-
Capacity can grow on demand.
-
Read and write use different index Pointers.
-
Support for chain calls.
-
Support for reference counting and pooling.
-
Can be extended by user-defined buffer types.
Before discussing ByteBuf, we need to take a look at the implementation of ByteBuffer so that we can understand the differences.
ByteBuffer inherits from Abstract class Buffer (so there are other implementations of LongBuffer, IntBuffer, etc.), and is essentially a finite linear sequence of elements containing three important properties.
-
Capacity: The Capacity of the elements in the buffer. You can write only Capacity elements into the buffer. Once the buffer is full, you need to clear the buffer to continue writing data.
-
Position: indicates the index pointer to the next Position where data is written. The initial Position is 0 and the maximum value is Capacity -1. Position needs to be reset to 0 when write mode is switched to read mode.
-
Limit: In write mode, Limit is the maximum index that can be written to the buffer, that is, it is equivalent to the size of the buffer in write mode. In read mode, limit indicates the maximum index of data that can be read.
Since position is the only index pointer maintained in Buffer, its switching between read and write modes requires calling a flip() method to reset the pointer. The process of using Buffer is as follows:
-
Writes data to the buffer.
-
Call the flip() method.
-
Reads data from the buffer
-
Call buffer.clear() or buffer.compact() to clear the buffer for the next write.
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt"."rw");
FileChannel inChannel = aFile.getChannel();
// Allocate a buffer of 48 bytes
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf); // Read data into the buffer
while(bytesRead ! = -1) {
buf.flip(); // Reset position to 0
while(buf.hasRemaining()){
System.out.print((char) buf.get()); // Reads the data and prints it to the console
}
buf.clear(); // Clear the buffer
bytesRead = inChannel.read(buf);
}
aFile.close();
Copy the code
The implementation of the core method in Buffer is also very simple, mainly operating on the pointer position.
/**
* Sets this buffer's mark at its position.
*
* @return This buffer
*/
public final Buffer mark(a) {
mark = position; // The mark attribute is used to mark the current index position
return this;
}
// Reset the current index position to the position marked by mark
public final Buffer reset(a) {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}
// Reverse the Buffer, set limit to the current index position, and then reset position to 0
public final Buffer flip(a) {
limit = position;
position = 0;
mark = -1;
return this;
}
// Clear the buffer
Postion and limit are reset, and data written later will overwrite the previous data
public final Buffer clear(a) {
position = 0;
limit = capacity;
mark = -1;
return this;
}
// Return the remaining space
public final int remaining(a) {
return limit - position;
}
Copy the code
The trouble with the Buffer API operation in Java NIO is that the read/write conversion requires a manual reset of the pointer. ByteBuf doesn’t have this hassle, maintaining two different indexes, one for reading and one for writing. When you read data from ByteBuf, its readerIndex is incremented by the number of bytes that have been read, and likewise, when you write data, writerIndex is incremented. The maximum range of readerIndex is at the location of writerIndex, and attempts to move a readerIndex beyond this value will raise an exception.
ByteBuf methods whose names start with read or write will increment their corresponding indexes, while methods whose names start with get or set will not. ByteBuf can also specify a maximum capacity beyond which attempts to move writerIndex will raise an exception.
public byte readByte(a) {
this.checkReadableBytes0(1); // Check if readerIndex is out of bounds
int i = this.readerIndex;
byte b = this._getByte(i);
this.readerIndex = i + 1; / / increment readerIndex
return b;
}
private void checkReadableBytes0(int minimumReadableBytes) {
this.ensureAccessible();
if(this.readerIndex > this.writerIndex - minimumReadableBytes) {
throw new IndexOutOfBoundsException(String.format("readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s".new Object[]{Integer.valueOf(this.readerIndex), Integer.valueOf(minimumReadableBytes), Integer.valueOf(this.writerIndex), this})); }}public ByteBuf writeByte(int value) {
this.ensureAccessible();
this.ensureWritable0(1); // Check whether writerIndex exceeds capacity
this._setByte(this.writerIndex++, value);
return this;
}
private void ensureWritable0(int minWritableBytes) {
if(minWritableBytes > this.writableBytes()) {
if(minWritableBytes > this.maxCapacity - this.writerIndex) {
throw new IndexOutOfBoundsException(String.format("writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s".new Object[]{Integer.valueOf(this.writerIndex), Integer.valueOf(minWritableBytes), Integer.valueOf(this.maxCapacity), this}));
} else {
int newCapacity = this.alloc().calculateNewCapacity(this.writerIndex + minWritableBytes, this.maxCapacity);
this.capacity(newCapacity); }}}// Get and set only check the incoming index and then get or set its position
public byte getByte(int index) {
this.checkIndex(index);
return this._getByte(index);
}
public ByteBuf setByte(int index, int value) {
this.checkIndex(index);
this._setByte(index, value);
return this;
}
Copy the code
ByteBuf also supports allocation in and out of the heap. Allocation in the heap, also known as the supported array pattern, provides fast allocation and deallocation without the use of pooling.
ByteBuf heapBuf = Unpooled.copiedBuffer(bytes);
if (heapBuf.hasArray()) { // Check if there is a support array
byte[] array = heapBuf.array();
// Calculate the offset of the first byte
int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
int length = heapBuf.readableBytes(); // Get readable bytes
handleArray(array,offset,length); // Call your handler
}
Copy the code
Another pattern is out-of-heap allocation. The Java NIO ByteBuffer class has allowed JVM implementations since JDK1.4 to allocate memory out of the heap via JNI calls (calling malloc() to allocate memory out of the JVM heap), mainly to avoid additional buffer copy operations.
ByteBuf directBuf = Unpooled.directBuffer(capacity);
if(! directBuf.hasArray()) {int length = directBuf.readableBytes();
byte[] array = new byte[length];
// Copy the bytes into the array
directBuf.getBytes(directBuf.readerIndex(),array);
handleArray(array,0,length);
}
Copy the code
ByteBuf also supports a third pattern, called composite buffers, that provides an aggregated view of multiple BytebuFs. In this view, you can add or remove instances of ByteBuf as needed. The CompositeByteBuf subclass implements this pattern.
A suitable scenario for using composite buffers is HTTP. Messages transmitted over HTTP are divided into two parts — header and body. If these two parts are produced by different modules of the application, they are assembled at message sending and the application also reuses the same body for multiple messages. This creates a new header for each message, resulting in many unnecessary memory operations. Using CompositeByteBuf is a good option, which eliminates this extra replication to help you reuse the messages.
CompositeByteBuf messageBuf = Unpooled.compositeBuffer(); ByteBuf headerBuf = .... ; ByteBuf bodyBuf = .... ; messageBuf.addComponents(headerBuf,bodyBuf);for (ByteBuf buf : messageBuf) {
System.out.println(buf.toString());
}
Copy the code
CompositeByteBuf transparently implements zero-copy, which prevents data from being copied back and forth between two memory regions. On the operating system level, zero-copy refers to avoiding data buffer copy between kernel and user mode (avoided by Mmap), while Netty zero-copy is more about optimizing data operation in user mode. Just as the CompositeByteBuf method is used to duplicate multiple ByteBuFs to avoid additional replication, a byte array can be wrapped as a ByteBuf using the wrap() method or divided into multiple ByteBuFs that share the same memory region using the Slice () method of ByteBuf, This is all to optimize memory usage.
So how do you create ByteBuf? In the above code, Unpooled is a Netty utility class for creating and allocating ByteBuf. It is recommended that you use this utility class to create your buffer and do not call the constructor yourself. Commonly used are wrappedBuffer() and copiedBuffer(), which wrap a byte array or ByteBuffer as a ByteBuf, One is to duplicate a new ByteBuf from the byte array passed in and ByteBuffer/ByteBuf.
// Use array.clone() to copy an array
public static ByteBuf copiedBuffer(byte[] array) {
return array.length == 0? EMPTY_BUFFER:wrappedBuffer((byte[])array.clone());
}
// The default is in-heap allocation
public static ByteBuf wrappedBuffer(byte[] array) {
return (ByteBuf)(array.length == 0? EMPTY_BUFFER:new UnpooledHeapByteBuf(ALLOC, array, array.length));
}
// Also provides out-of-heap allocation methods
private static final ByteBufAllocator ALLOC;
public static ByteBuf directBuffer(int initialCapacity) {
return ALLOC.directBuffer(initialCapacity);
}
Copy the code
The underlying allocation method is ByteBufAllocator. Netty implements PooledByteBufAllocator and UnpooledByteBufAllocator. The former uses Jemalloc (an implementation of Malloc ()) to allocate memory. ByteBuf is pooled to improve performance. The latter allocates the unpooled ByteBuf in the same way as before.
Channel channel = ... ; ByteBufAllocator allocator = channel.alloc(); ByteBuf buffer = allocator.directBuffer();do something.......
Copy the code
To optimize memory usage, Netty provides a manual way to track inactive objects. Objects such as UnpooledHeapByteBuf that are allocated in the heap benefit from the JVM’s GC management without extra concern, while UnpooledDirectByteBuf is allocated outside the heap. It is internally based on DirectByteBuffer, which allocates a quota to Bits (Bits also has a global variable totalCapacity, which records the total size of all DirectByteBuffers). Each application is checked to see if it has exceeded the limit set by -xx :MaxDirectMemorySize. If it has exceeded the limit, sytem.gc () is attempted to reclaim some memory, and then sleep for 100ms. If it is still out of memory, only an OOM exception is raised. Although off-heap memory collection has such a layer of security, but in order to improve performance and utilization, active collection is also necessary. Since Netty also implements pooling of ByteBuf, the likes of PooledHeapByteBuf and PooledDirectByteBuf must rely on manual recycling (back into the pool).
Netty uses reference counters to keep track of inactive objects. The ReferenceCounted interface for reference counting is ReferenceCounted. The idea is simple: As long as the reference count of a ByteBuf object is greater than 0, it is guaranteed that the object will not be released and reclaimed. The release() and retain() methods can be manually called to decrement or increment the reference count value of the object. Users can also customize a ReferenceCounted implementation class to meet custom rules.
package io.netty.buffer;
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
RefCnt is not wrapped as AtomicInteger because the number of ByteBuf instance objects can be quite large
/ / but use a global AtomicIntegerFieldUpdater refCnt responsible for operation
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
// Each ByteBuf has an initial reference value of 1
private volatile int refCnt = 1;
public int refCnt(a) {
return this.refCnt;
}
protected final void setRefCnt(int refCnt) {
this.refCnt = refCnt;
}
public ByteBuf retain(a) {
return this.retain0(1);
}
// Increment increment, increment must be greater than 0
public ByteBuf retain(int increment) {
return this.retain0(ObjectUtil.checkPositive(increment, "increment"));
}
public static int checkPositive(int i, String name) {
if(i <= 0) {
throw new IllegalArgumentException(name + ":" + i + " (expected: > 0)");
} else {
returni; }}// Keep trying to update the value using the CAS operation
private ByteBuf retain0(int increment) {
int refCnt;
int nextCnt;
do {
refCnt = this.refCnt;
nextCnt = refCnt + increment;
if(nextCnt <= increment) {
throw newIllegalReferenceCountException(refCnt, increment); }}while(! refCntUpdater.compareAndSet(this, refCnt, nextCnt));
return this;
}
public boolean release(a) {
return this.release0(1);
}
public boolean release(int decrement) {
return this.release0(ObjectUtil.checkPositive(decrement, "decrement"));
}
private boolean release0(int decrement) {
int refCnt;
do {
refCnt = this.refCnt;
if(refCnt < decrement) {
throw newIllegalReferenceCountException(refCnt, -decrement); }}while(! refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement));
if(refCnt == decrement) {
this.deallocate();
return true;
} else {
return false; }}protected abstract void deallocate(a);
}
Copy the code
Channel
Like Java NIO, a Channel in Netty is an abstraction of an entity or connection, but Netty provides a more generic API. In the case of network sockets, OIO and NIO are two very different apis in Java, and if you were using OIO and wanted to switch to NIO, you would have to rewrite almost everything. In Netty, the conversion between OIO and NIO (or whatever) can be accomplished by changing just a few lines of code (changing the implementation classes of Channel and EventLoop, for example, replacing OioServerSocketChannel with NioServerSocketChannel).
Each Channel is ultimately assigned a ChannelPipeline, which holds all the ChannelHandlers responsible for handling inbound and outbound data and events, and a ChannelConfig, which contains all the configuration Settings for the Channel and supports hot updates. Because different transport types may have their own special configurations, the class may be implemented as different subclasses of ChannelConfig.
Channels are thread-safe (related to the thread model later), so you can reuse the same Channel across multiple threads, as shown in the code below.
final Channel channel = ...
final ByteBuf buffer = Unpooled.copiedBuffer("Hello,World!", CharsetUtil.UTF_8).retain();
Runnable writer = new Runnable() {
@Override
public void run(a) { channel.writeAndFlush(buffer.duplicate()); }}; Executor executor = Executors.newCachedThreadPool(); executor.execute(writer); executor.execute(writer); .Copy the code
In addition to supporting the usual NIO and OIO, Netty also has other transport types built in.
Nmae | Package | Description |
---|---|---|
NIO | io.netty.channel.socket.nio | Based on Java NIO |
OIO | io.netty.channel.socket.oio | Java.net based implementation, using the blocking I/O model |
Epoll | io.netty.channel.epoll | Higher performance non-blocking I/O implemented by JNI driven epoll(), which can only be used on Linux |
Local | io.netty.channel.local | Local transport, which communicates through pipes within the JVM |
Embedded | io.netty.channel.embedded | Allows ChannelHandler to be used in an environment that does not require real network transport, mainly for testing ChannelHandler |
NIO, OIO, Epoll we should be familiar with, the following mainly talk about Local and Embedded.
Local transports are used for asynchronous communication between client and server programs running in the same JVM. The SocketAddress associated with the server Channel is not bound to a real physical network address, but is stored in the registry and unregistered when the Channel is closed. So Local transport does not accept real network traffic, which means it cannot interoperate with other transport implementations.
Embedded transports are primarily used to unit test ChannelHandler, the logical component used to process messages, Netty implemented unit testing of the ChannelHandler by writing both inbound and outbound messages to the EmbeddedChannel (providing write/readInbound() and write/readOutbound() to read and write inbound and outbound messages).
ChannelHandler
ChannelHandler acts as a container for the application logic that handles inbound and outbound data. This class is event-driven and calls its associated callback function in response to relevant events. For example, when a new connection is established, The ChannelHandler’s channelActive() method will be called.
Regarding the data flow definition of inbound and outbound messages, data flowing from client to server is referred to as outbound if the client is dominant, and inbound if not.
Inbound events are events that can be triggered by inbound data or related state changes, including: connection activation, connection deactivation, inbound data reading, user events, exceptions, and so on.
An outbound event is an event that will be triggered as a result of some future action, such as opening or closing a connection to a remote node, or writing (or flushing) data to a socket.
The main uses of ChannelHandler include:
-
Business logic processing of inbound and outbound data
-
log
-
Converts data from one format to another to implement codecs. With an HTTP protocol (or other application layer protocol) process, for example, the basic unit of data in the network transmission is byte, when the client sends a request to the server, the server needs through a decoder (process inbound messages) will byte decoding for the message content of the agreement, the server when sending response (outbound messages), You also need to encode the message content into bytes through an encoder.
-
Catch exceptions
-
Provides notifications during a Channel’s life cycle, such as when a Channel is active and when it is inactive
Netty is full of asynchrony and event-driven functions, and callbacks are used to respond to actions after an event. Due to the asynchronous will return a result directly, so the Netty provides ChannelFuture (implements the Java. Util. Concurrent. Future) as a placeholder for asynchronous invocation returns, real results will be done sometime in the Future, It will then be accessible through ChannelFuture, which will return a ChannelFuture for each Netty outbound I/O operation.
Netty also provides the ChannelFutureListener interface to listen for ChannelFuture success and take corresponding actions.
Channel channel = ...
ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1".6666));
// Register a listener
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// do something....
} else {
// Output error information
Throwable cause = future.cause();
cause.printStackTrace();
// do something....}}});Copy the code
Several simple default implementations are also provided in the ChannelFutureListener interface for easy use.
package io.netty.channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {
// Close when the Future is done
ChannelFutureListener CLOSE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) { future.channel().close(); }};// If it fails, close it
ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if(! future.isSuccess()) { future.channel().close(); }}};// Pass exception information to the next ChannelHandler
ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if(! future.isSuccess()) { future.channel().pipeline().fireExceptionCaught(future.cause()); }}}; }Copy the code
The ChannelHandler interface defines callback functions that listen for its lifecycle and are called whenever a ChannelHandler is added to the ChannelPipeline or removed.
package io.netty.channel;
public interface ChannelHandler {
void handlerAdded(ChannelHandlerContext var1) throws Exception;
void handlerRemoved(ChannelHandlerContext var1) throws Exception;
/ * *@deprecated* /
@Deprecated
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
// This annotation indicates that this ChannelHandler can be reused by other threads
@Inherited
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface Sharable {
}
}
Copy the code
Inbound and outbound messages are the responsibility of their corresponding interfaces ChannelInboundHandler and ChannelOutboundHandler, which define callback functions that listen for state-change events during a Channel’s life cycle.
package io.netty.channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
public interface ChannelInboundHandler extends ChannelHandler {
// Called when a channel is registered with an EventLoop
void channelRegistered(ChannelHandlerContext var1) throws Exception;
// Called when a channel has been created but is not registered with (or unregistered from) the EventLoop
void channelUnregistered(ChannelHandlerContext var1) throws Exception;
// Called when a channel is active (connected to a remote node)
void channelActive(ChannelHandlerContext var1) throws Exception;
// Called when a channel is inactive (not connected to a remote node)
void channelInactive(ChannelHandlerContext var1) throws Exception;
// called when reading data from a channel
void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
// Called when the last read of a channel has completed
void channelReadComplete(ChannelHandlerContext var1) throws Exception;
. / / when ChannelInboundHandler fireUserEventTriggered () method is invoked is invoked
void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
// Called when a channel's writable state changes
void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
// Called when an exception occurs during processing
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}
package io.netty.channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.net.SocketAddress;
public interface ChannelOutboundHandler extends ChannelHandler {
// Called when a request binds a Channel to an address
// ChannelPromise is a subinterface of ChannelFuture that defines methods such as setSuccess(),setFailure(), etc
void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;
// Called when a request is made to connect a Channel to a remote node
void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;
// Called when a request disconnects a Channel from a remote node
void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// Called when a request is made to close a Channel
void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
Called when the request unlogs a Channel from its EventLoop
void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// called when a request reads data from a Channel
void read(ChannelHandlerContext var1) throws Exception;
// called when a request writes data to a remote node through a Channel
void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;
// Called when a request flushes cached data to a remote node through a Channel
void flush(ChannelHandlerContext var1) throws Exception;
}
Copy the code
You can implement ChannelInboundHandler or ChannelOutboundHandler to handle custom application logic, but Netty already does some basic things for you. Users only need to inherit and expand ChannelInboundHandlerAdapter or ChannelOutboundHandlerAdapter as a starting point for custom implementations.
ChannelInboundHandlerAdapter and ChannelOutboundHandlerAdapter inheritance in ChannelHandlerAdapter, the abstract class implements the simple ChannelHandler interface.
public abstract class ChannelHandlerAdapter implements ChannelHandler {
boolean added;
public ChannelHandlerAdapter(a) {}// This method does not allow reuse of this ChannelHandler share
protected void ensureNotSharable(a) {
if(this.isSharable()) {
throw new IllegalStateException("ChannelHandler " + this.getClass().getName() + " is not allowed to be shared"); }}// Use reflection to determine if the implementation class has @sharable annotations to verify that the class is shareable
public boolean isSharable(a) {
Class clazz = this.getClass();
Map cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = (Boolean)cache.get(clazz);
if(sharable == null) {
sharable = Boolean.valueOf(clazz.isAnnotationPresent(Sharable.class));
cache.put(clazz, sharable);
}
return sharable.booleanValue();
}
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {}public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); }}Copy the code
ChannelInboundHandlerAdapter and ChannelOutboundHandlerAdapter default simply pass the request to the next ChannelHandler ChannelPipeline, source code is as follows:
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
public ChannelInboundHandlerAdapter(a) {}public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); }}public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
public ChannelOutboundHandlerAdapter(a) {}public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.disconnect(promise);
}
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.close(promise);
}
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); }}Copy the code
For processing the inbound message, another option is to inherit SimpleChannelInboundHandler, it is an inheritance of Netty ChannelInboundHandlerAdapter abstract class, and on the realized the function of automatic release resources.
We learned from ByteBuf that Netty uses a set of self-implemented reference counting algorithms to proactively release resources, Suppose your ChannelHandler inheritance on or ChannelOutboundHandlerAdapter ChannelInboundHandlerAdapter, you will have the responsibility to manage your allocated ByteBuf, in general, A message object (ByteBuf) has been consumed (or discarded) and will not be passed on to the next handler in the ChannelHandler chain (if the message reaches the actual transport layer, it will be automatically released when it is written or a Channel is closed), so you need to release it manually. This can be done with a simple release method of the utility class ReferenceCountUtil.
// This generic is the type of the message object
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
private final TypeParameterMatcher matcher;
private final boolean autoRelease;
protected SimpleChannelInboundHandler(a) {
this(true);
}
protected SimpleChannelInboundHandler(boolean autoRelease) {
this.matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I");
this.autoRelease = autoRelease;
}
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) {
this(inboundMessageType, true);
}
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, boolean autoRelease) {
this.matcher = TypeParameterMatcher.get(inboundMessageType);
this.autoRelease = autoRelease;
}
public boolean acceptInboundMessage(Object msg) throws Exception {
return this.matcher.match(msg);
}
/ / SimpleChannelInboundHandler just for you to do the ReferenceCountUtil release ()
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if(this.acceptInboundMessage(msg)) {
this.channelRead0(ctx, msg);
} else {
release = false; ctx.fireChannelRead(msg); }}finally {
if(this.autoRelease && release) { ReferenceCountUtil.release(msg); }}}// This method is the one we need to implement
protected abstract void channelRead0(ChannelHandlerContext var1, I var2) throws Exception;
}
// ReferenceCountUtil, the release method determines the type of the message object and calls its Release () method
public static boolean release(Object msg) {
return msg instanceofReferenceCounted? ((ReferenceCounted)msg).release():false;
}
Copy the code
ChannelPipeline
For modularity and decoupling, it’s impossible for a single ChannelHandler to do all the application logic, so Netty uses a chain of interceptors. A ChannelPipeline is a container that manages the ChannelHandler instance chain. Its job is to keep the instance chain flowing.
Each newly created Channel will be assigned a new Channel Channel. The relationship is permanent, and a Channel can only correspond to one Channel Channel in its lifetime.
When an inbound event is triggered, it propagates from the left-most end of the ChannelPipeline (header) to the right-most end of the ChannelPipeline (tail), with the outbound event propagating in the opposite order (right-most to left-most). The order is deterministic; Netty always uses ChannelPipeline’s inbound port as the head and outbound port as the tail. During event propagation, ChannelPipeline determines whether the type of the next ChannelHandler matches the direction of the event. If not, Skip the ChannelHandler and move on to the next one (ensuring that inbound events are only handled by the ChannelInboundHandler), A ChannelHandler can also implement both ChannelInboundHandler and ChannelOutboundHandler, which is called on both inbound and outbound events.
This interface is the key for ChannelPipeline to connect with ChannelHandler. ChannelHandlerContext can notify the next ChannelHandler of the current ChannelHandler in ChannelPipeline, You can also dynamically change the location of the current ChannelHandler in the ChannelPipeline (by calling various methods in the ChannelPipeline).
ChannelHandlerContext is responsible for ChannelHandler interactions with other ChannelHandlers in the same ChannelPipeline. Each ChannelHandlerContext corresponds to a ChannelHandler. DefaultChannelPipeline source, has been very obvious.
public class DefaultChannelPipeline implements ChannelPipeline {...// Reference variables for the head and tail nodes
ChannelHandlerContext is organized as a linked list in ChannelPipeline
final AbstractChannelHandlerContext head;
finalAbstractChannelHandlerContext tail; .// Add a ChannelHandler to the end of the list
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return this.addLast((EventExecutorGroup)null, name, handler);
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized(this) {
// Check if ChannelHandler is a shared object (@sharable)
// Throw an exception if the ChannelHandler has no @sharable annotation and is already added
checkMultiplicity(handler);
/ / returns a DefaultChannelHandlerContext, pay attention to the object to hold the incoming ChannelHandler
newCtx = this.newContext(group, this.filterName(name, handler), handler);
this.addLast0(newCtx);
// If the current ChannelPipeline is not registered, add it to the pending list first
if(!this.registered) {
newCtx.setAddPending();
this.callHandlerCallbackLater(newCtx, true);
return this;
}
// Otherwise call handlerAdded() in ChannelHandler
EventExecutor executor = newCtx.executor();
if(! executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() {
public void run(a) {
DefaultChannelPipeline.this.callHandlerAdded0(newCtx); }});return this; }}this.callHandlerAdded0(newCtx);
return this;
}
// Insert the new ChannelHandlerContext between the tail and the node before the tail
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = this.tail.prev;
newCtx.prev = prev;
newCtx.next = this.tail;
prev.next = newCtx;
this.tail.prev = newCtx; }... }Copy the code
ChannelHandlerContext also defines a number of methods that overlap with Channel and ChannelPipeline (such as read(), write(), connect() for outbound methods or inbound methods such as fireChannelXXXX() ), the difference is that calling these methods on a Channel or ChannelPipeline will propagate from the beginning through the entire ChannelHandler instance chain, while calling the same method on the ChannelHandlerContext, Will start with the currently associated ChannelHandler and will only propagate to the next ChannelHandler in the instance chain. Furthermore, movement between events (from one ChannelHandler to the next) is also done through method calls in the ChannelHandlerContext.
public class DefaultChannelPipeline implements ChannelPipeline {
public final ChannelPipeline fireChannelRead(Object msg) {
// Notice that the head node is passed in
AbstractChannelHandlerContext.invokeChannelRead(this.head, msg);
return this; }}abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext.ResourceLeakHint {
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if(executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
public void run(a) { next.invokeChannelRead(m); }}); }}private void invokeChannelRead(Object msg) {
if(this.invokeHandler()) {
try {
((ChannelInboundHandler)this.handler()).channelRead(this, msg);
} catch (Throwable var3) {
this.notifyHandlerException(var3); }}else {
// Find the next ChannelHandler
this.fireChannelRead(msg); }}public ChannelHandlerContext fireChannelRead(Object msg) {
invokeChannelRead(this.findContextInbound(), msg);
return this;
}
private AbstractChannelHandlerContext findContextInbound(a) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while(! ctx.inbound);// Until a ChannelInboundHandler is found
returnctx; }}Copy the code
EventLoop
To maximize performance and maintainability, Netty has designed a powerful and easy-to-use threading model. The most important capability in a network framework is the ability to quickly and efficiently handle the various events that occur during the lifetime of a connection. The corresponding program constructs are called event loops, and Netty defines the interface EventLoop to do this.
If you’re a regular multithreaded developer in Java, you’re probably going to use the thread pool, the Executor API. Netty extends its EventExecutorGroup (IO.net ty.util.concurrent) from Executor (Java.util.concurrent), and in order to interact with Channel events, The EventLoopGroup interface (IO.net ty.channel) is also extended. EventExecutorXXX under the IO.net ty.util.concurrent package is responsible for the work related to thread concurrency, while EventLoopXXX under the IO.Net ty.channel package is responsible for the work related to network programming (handling events in the channel).
In Netty’s threading model, an EventLoop would be driven by a Thread that would never change, while a Channel would only use one EventLoop in its lifetime (but an EventLoop might be assigned to serve multiple channels), All I/O operations and events in a Channel are handled by threads in the EventLoop, which means that only one thread is used in a Channel’s lifetime. However, in Netty3, only inbound events are handled by EventLoop, and all outbound events are handled by the calling thread, which leads to thread-safety issues with ChannelHandler. Netty4 simplifies the threading model, both solving this problem and providing a simpler architecture by processing all events in the same thread.
package io.netty.channel;
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks".2147483647));
private final Queue<Runnable> tailTasks;
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
}
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
}
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
this.tailTasks = this.newTaskQueue(maxPendingTasks);
}
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
this.tailTasks = this.newTaskQueue(maxPendingTasks);
}
// Return the EventLoopGroup to which it belongs
public EventLoopGroup parent(a) {
return (EventLoopGroup)super.parent();
}
public EventLoop next(a) {
return (EventLoop)super.next();
}
// Register a Channel, where ChannelPromise is associated with a Channel
public ChannelFuture register(Channel channel) {
return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
}
public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
// The rest of these functions are used to schedule tasks
public final void executeAfterEventLoopIteration(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
if(this.isShutdown()) {
reject();
}
if(!this.tailTasks.offer(task)) {
this.reject(task);
}
if(this.wakesUpForTask(task)) {
this.wakeup(this.inEventLoop()); }}final boolean removeAfterEventLoopIterationTask(Runnable task) {
return this.tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
}
protected boolean wakesUpForTask(Runnable task) {
return! (taskinstanceof SingleThreadEventLoop.NonWakeupRunnable);
}
protected void afterRunningAllTasks(a) {
this.runAllTasksFrom(this.tailTasks);
}
protected boolean hasTasks(a) {
return super.hasTasks() || !this.tailTasks.isEmpty();
}
public int pendingTasks(a) {
return super.pendingTasks() + this.tailTasks.size();
}
interface NonWakeupRunnable extends Runnable {}}Copy the code
To ensure that an EventLoop is responsible for I/O events throughout the life of a Channel, Netty uses the inEventLoop() method to determine the identity of the currently executing thread and whether it is the thread assigned to the current Channel and its EventLoop. If the current thread is the one in EventLoop, the submitted task will be executed directly; otherwise, EventLoop will schedule the task for later execution and place it in an internal task queue (each EventLoop has its own task queue, The source code for SingleThreadEventLoop shows a number of ways to schedule internal task queues), which will execute those tasks in the queue the next time its event is processed. This design allows any thread to interact directly with a Channel without additional synchronization in the ChannelHandler.
For the sake of performance, do not put a task that takes a long time to run into the task queue, because it will affect the execution of other tasks in the queue. The solution is to use a dedicated EventExecutor to execute it (the ChannelPipeline provides the addXXX() method with the EventExecutorGroup parameter, This method binds the ChannelHandler passed in to your EventExecutor, so it will execute in another thread, isolated from other tasks.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {...public void execute(Runnable task) {
if(task == null) {
throw new NullPointerException("task");
} else {
boolean inEventLoop = this.inEventLoop();
if(inEventLoop) {
this.addTask(task);
} else {
this.startThread();
this.addTask(task);
if(this.isShutdown() && this.removeTask(task)) { reject(); }}if(!this.addTaskWakesUp && this.wakesUpForTask(task)) {
this.wakeup(inEventLoop); }}}public boolean inEventLoop(Thread thread) {
return thread == this.thread; }... }Copy the code
The EventLoopGroup manages and allocates Eventloops (creating them and assigning them to each newly created Channel) in different ways depending on the transport type. For example, with the NIO transport type, the EventLoopGroup will use fewer Eventloops (one EventLoop serves multiple channels) because NIO is based on I/O multiplexing and one thread can handle multiple connections, whereas with OIO, To create a new Channel, allocate an EventLoop.
Bootstrap
After digging into Netty’s core components, they are modular in design and need to be assembled if you want to implement your own applications. Netty uses the Bootstrap class to configure a Netty application (assemble the components) and finally get it up and running. The Bootstrap classes used by the client program and the server program are different. The latter needs to use ServerBootstrap. This is because in connected protocols such as TCP, the server program usually needs more than one Channel. A parent Channel accepts connections from clients and creates child channels for communication between them, whereas a connectionless protocol like UDP does not require each connection to create a child Channel, just one.
One obvious difference is the Group () method of Bootstrap versus ServerBootstrap, which provides a version that accepts two EventLoopGroups.
AbstractBootstrap < AbstractBootstrap > < AbstractBootstrap > < AbstractBootstrap > < AbstractBootstrap > < AbstractBootstrap >
public B group(EventLoopGroup group) {
if(group == null) {
throw new NullPointerException("group");
} else if(this.group ! =null) {
throw new IllegalStateException("group set already");
} else {
this.group = group;
return this; }}ServerBootstrap, which also supports only one EventLoopGroup
public ServerBootstrap group(EventLoopGroup group) {
return this.group(group, group);
}
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if(childGroup == null) {
throw new NullPointerException("childGroup");
} else if(this.childGroup ! =null) {
throw new IllegalStateException("childGroup set already");
} else {
this.childGroup = childGroup;
return this; }}Copy the code
Bootstrap is nothing more than an assembler who puts components together and configures them. See Netty JavaDoc for more details about its API. Let’s walk through the process of creating a Netty application using a classic Echo client and server example.
The first thing to implement is the server, and we’ll implement an EchoServerInboundHandler to handle inbound messages.
public class EchoServerInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.printf("Server received: %s \n", in.toString(CharsetUtil.UTF_8));
WriteAndFlush is not called because the read event does not send the full message in one go
ctx.write(in); // Write the message directly back to the client (this will be handled by the outbound message handler, but our application does not implement any outbound message handlers)
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// Flush the buffer of previously written data when the read event has completed
// We then add a listener that closes the Channel when the Future completes.
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
// Handle the exception, print the exception information, and close the Channel
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code
There is only so much server application logic left to configure with ServerBootstrap.
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start(a) throws Exception {
final EchoServerInboundHandler serverHandler = new EchoServerInboundHandler();
EventLoopGroup group = new NioEventLoopGroup(); // The transport type uses NIO
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group) / / configuration EventLoopGroup
.channel(NioServerSocketChannel.class) // Set the Channel type
.localAddress(new InetSocketAddress(port)) // Configure the port number
.childHandler(new ChannelInitializer<SocketChannel>() {
// Implement a ChannelInitializer, which makes it easy to add multiple ChannelHandlers
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(serverHandler); }});// I binds the address and waits synchronously for it to complete
ChannelFuture f = b.bind().sync();
// Close the Future
f.channel().closeFuture().sync();
} finally {
// Close the application. Generally Netty applications only need to call this methodgroup.shutdownGracefully().sync(); }}public static void main(String[] args) throws Exception {
if(args.length ! =1) {
System.err.printf(
"Usage: %s <port> \n",
EchoServer.class.getSimpleName()
);
return;
}
int port = Integer.parseInt(args[0]);
newEchoServer(port).start(); }}Copy the code
To implement the client side, you also need to implement an inbound message handler first.
public class EchoClientInboundHandler extends SimpleChannelInboundHandler<ByteBuf> {
/** * We connect to the remote node in Channel and send a message directly to the server */
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty!", CharsetUtil.UTF_8));
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
// Outputs messages from the server Echo
System.out.printf("Client received: %s \n", byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code
Then configure the client.
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start(a) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port)) // Address of the server
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(newEchoClientInboundHandler()); }}); ChannelFuture f = b.connect().sync();// Connect to the server
f.channel().closeFuture().sync();
} finally{ group.shutdownGracefully().sync(); }}public static void main(String[] args) throws Exception {
if(args.length ! =2) {
System.err.printf("Usage: %s <host> <port> \n", EchoClient.class.getSimpleName());
return;
}
String host = args[0];
int port = Integer.parseInt(args[1]);
newEchoClient(host, port).start(); }}Copy the code
Implementing a Netty application is as simple as that. Most users are writing channelhandlers for various application logic (or using various utility channelhandlers built into Netty) and then simply adding them all to the ChannelPipeline.
reference
-
Netty: Home
-
Chapter 6. I/O Multiplexing: The select and poll Functions – Shichao’s Notes
-
epoll(7) – Linux manual page
-
Java NIO