I met Netty
Netty’s official website
The basic concept
Netty is a Java open source tool provided by JBOSS. It is a client/server coding framework based on NIO. At the same time, Netty also has high performance, high expansion, asynchronous event driven and other features, which are deeply favored by various applications
Based on Netty, network server and client applications can be developed quickly
The target
- Make development “fast and easy”
- In the “fast and easy” premise, in addition to the development of TCP/UDP and other custom protocol communication programs, can also be “fast and easy” development of application layer protocols, such as FTP, SMTP, HTTP and other traditional application layer protocols
- Achieve high performance, high scalability
advantages
Netty is widely used, with hundreds of distributed middleware applications, open source projects and commercial projects. Kafka, ElasticSearch, Dubbo, etc., all use Netty, and this widespread usage is closely related to its great advantages.
- Provides asynchronous, event-driven web application frameworks and tools
- The API is simple to use and the development threshold is low
- Powerful, built in a variety of codecs, support a variety of mainstream protocols
- High performance, high scalability
- Active community, mature tools, stable
What is asynchronous event-driven
Asynchronous event drivers can be classified as:
- Asynchronous:
Represents non-blocking. Standard IO operations are in blocking mode
- event-driven
Future-listener mechanism to obtain the I/O operation results through notification mechanism
Preparation before learning Netty
Before learning Netty, we need to do some preparation work, preparation work is done well, learning Netty not worry
- javaNIO
Prior to JDK1.4, Java IO operations were blocking. To compensate, a new asynchronous IO library, javaNewIO, or NIO, was introduced
- Reactor model
Reactor is the basic pattern of high performance network programming in design and architecture. Only by understanding the Reactor pattern, can we easily learn and master Netty, and the overall structure of Netty is Reactor pattern
With these two things in mind before we start our Netty journey, let’s start with javaNIO
NIO
JavaNIO is a buffer-based, channel-based I/O operation. Unlike the standard I/O operation, which uses blocking mode for read and write operations, NIO uses non-blocking mode for read and write operations. JavaNIO is much more efficient than the standard I/O operation
So, how does NIO do that? Let’s look at its IO model
NIO IO model :IO multiplexing model
What is the IO multiplexing model
In Java, the socket connection mode is blocking by default, but in Linux, the socket can be set to non-blocking mode. The non-blocking I/O read and write mode is called synchronous non-blocking I/O. The following two situations occur:
- In the absence of data in the kernel buffer, the system call returns immediately, returning a message indicating that the call failed
- In the case of data in the kernel buffer, it is blocked until the data is copied from the kernel buffer to the user process buffer. After the replication is complete, the system call returns success and the application begins processing the cached data in user space
In this case, if the user thread needs to poll until the data is present in order to read the final data, the disadvantages of this approach are obvious:
- Constantly polling the kernel consumes a lot of CPU time and is inefficient
So in order to avoid the problem of polling wait in synchronous non-blocking IO, IO multiplexing model is introduced
In IO multiplexing models, introduces a new system call: query IO ready state, through the system call, a process to monitor multiple file descriptors, once a file descriptor is ready, the kernel to the ready state is returned to the application, then the application through the ready state, IO system operation accordingly
Application of IO multiplexing model
- select
- epoll
Based on the component
Let’s move on to real NIO learning,
NIO is made up of three core components
- Buffer
Buffers, the primary area of interaction between an application and a Channel
- Channel
A channel is a combination of input and output streams
- Selector
Selector, which is responsible for querying IO events, queries whether a Channel’s IO events are ready, and the Channel belongs to the relationship between monitoring and being monitored
Let’s look at buffers first
Buffer
A buffer is essentially a block of memory from which data can be written and read
Important attributes
- capacity
Said the size of the internal Buffer capacity, if the amount of data written to exceed capacity, so will no longer write and throws an exception: Java nio. BufferOverflowException
- position
Position is related to the buffer read/write mode,
In write mode:
- Just entered write mode, position=0
- Position +=1 for each bit of data written to the buffer
- When position>=limit, there is no space to write
In read mode:
- Just entered read mode, position=0
- Position +=1 for each bit of data read
- When position>=limit, no data can be read
How do I switch between read and write mode, which involves position and limit
- limit
Indicates the upper limit of read and write operations. When the system enters the write mode, the upper limit of read and write operations = Capacity Container size
In read mode, limit= Position in write mode
The Buffer class is a non-thread-safe class
The Buffer class is an abstract class, located in java.nio. Its subclasses correspond to the main data types in Java, and its internal is composed of arrays corresponding to the subclass types.
DoubleBuffer
DoubleBuffer buffer = DoubleBuffer.allocate(100); // Create a Buffer with 100 internal capacity
Copy the code
Trace its source code:
public static DoubleBuffer allocate(intcapacity){
if(capacity<0)
throw new IllegalArgumentException();
return new HeapDoubleBuffer(capacity,capacity);
}
//-------------HeapDoubleBuffer----------------
HeapDoubleBuffer(intcap,intlim){
super(-1.0,lim,cap,newdouble[cap],0);
}
//---------------DoubleBuffer---------------
DoubleBuffer(int mark,int pos,int lim,int cap,
double[] hb,int offset)
{
super(mark,pos,lim,cap);
this.hb=hb;
this.offset=offset;
}
Copy the code
Other subclass
- ByteBuffer: The most widely used Buffer, which will be used in the following examples
- CharBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
- MappedByteBuffer
A type specifically used for memory mapping
All buffers are created the same way. Instead of giving examples, here are some important concepts
Let me guess, a lot of people will think of StringBuffer, O(∩_∩)O ha ha ~, different
Important method
allocate
Create a Buffer object and allocate memory, and by default the Buffer is in write mode.
I’m going to use ByteBuffer here
ByteBuffer buffer=ByteBuffer.allocate(100);
private static void show(ByteBuffer buffer){
System.out.print("position:"+buffer.position());
System.out.print("\t");
System.out.print("capacity:"+buffer.capacity());
System.out.print("\t");
System.out.println("limit:"+buffer.limit());
System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
}
//position:0 capacity:100 limit:100
Copy the code
If position is 0 and limit is the same as the initial capacity, the write mode is enabled
put
Write data to Buffer
buffer.put("helloworld".getBytes());
// Continue to call the show method to see the position change
//position:11 capacity:100 limit:100
Copy the code
Position becomes 11, and everything else stays the same
flip
Flip to convert write mode to read mode
buffer.flip();
// Continue to call the show method to see the position change
//position:0 capacity:100 limit:11
Copy the code
For both before and after the flip, limit becomes the value of position before the flip, position is reset to 0, and when position>=limit, no data can be read
So how do you switch back to write mode?
clear/compact
Both methods can convert read mode to write mode,
The clear: empty
Compact: compression
get
After switching mode to read mode, you can start reading data from the buffer, position+1 for each read
buffer.get();
Copy the code
Call if you need to read the entire array
buffer.array()
Copy the code
rewind
The rewind() method is called if the data has been read and needs to be read again
Summarize the steps of using Buffer
allocate
Create an instance, default to write modeput
Write data- call
flip
, into read mode get
Read the data- If you need to continue writing data after data reading is complete, call
clear/compact
Convert mode to write mode
The Buffer operations focus on position and limit changes, you can see the corresponding method source
Channel
As mentioned above, JavaNIO is a buffer-based, channel-based I/O operation. In NIO, you can think of a connection as a channel, and a connection is a channel
As one of the core components of NIO, there are different types of channels implemented according to different transport protocols
NIO’s I/O operations are essentially operating on buffers
Let’s learn one by one
FileChannel
use
File channel. A file channel is a channel used to manipulate files. It can read data from files and write data to files.
FileChannel is a blocking channel and cannot be set to non-blocking mode
Access to the channel
// Get the read channel
FileChannel fisChannel=new FileInputStream("").getChannel();
// Get the output channel
FileChannel fosChannel=new FileOutputStream("").getChannel();
// Get the channel through the file random access class
FileChannel AccChannel=new RandowAccessFile(""."rw").getChannel();
Copy the code
Different types of flows get different meanings of channels,
Read the data
NIO’s I/O operation is essentially the operation of Buffer. Note this sentence: reading data means writing data to Buffer, so Buffer mode here is write mode
// Create a ByteBuffer of 1024 capacity
ByteBuffer buffer=ByteBuffer.allocate(1024);
// Since the Buffer is in write mode, we do not need to convert it
// Call read() on the read channel to return the amount of data read
intlen=fisChannel.read(buffer);
Copy the code
Write data
After reading the data, we want to write the data to the specified file. We can do this:
// Switch buffer mode: read mode
buffer.flip();
// Write to the specified file
int len=fosChannel.write(buffer);
// Switch buffer mode: write mode
buffer.clear();
Copy the code
Why do I need to switch the Buffer mode here?
Output channel Process if you want to write data to a file:
- From the first
Buffer
Data is read from the - Output to file
So you need to switch modes, and the same goes for calling clear().
Forcibly refreshing data
When writing the buffer to the channel, it is done by the operating system. Due to performance problems, it is impossible to write the buffer in real time every time, so in order to ensure that the data is actually written to disk, it needs to call the channel forcible refresh to complete
fosChannel.force(true);
Copy the code
Close the channel
As with streams, channels need to be closed
fisChannel.close();
fosChannel.close();
Copy the code
Comprehensive example: Copy files
Let’s do a complete example using FileChannel:
public class CopyFileByFileChannel {
static ByteBuffer buffer = ByteBuffer.allocate(1024);
public static void main(String[] args) {
copy_file();
}
private static void copy_file(a) {
FileChannel fisChannel = null;
FileChannel fosChannel = null;
FileInputStream fis = null;
FileOutputStream fos = null;
try {
fis = new FileInputStream("D:\\work\\web\\study-netty\\src\\main\\java\\top\\zopx\\study\\nio\\CopyFileByFileChannel.java");
fos = new FileOutputStream("D:\\work\\web\\study-netty\\src\\main\\java\\top\\zopx\\study\\nio\\CopyFileByFileChannel.txt");
fisChannel = fis.getChannel();
fosChannel = fos.getChannel();
while(fisChannel.read(buffer) ! = -1) {
buffer.flip();
// int outLen = 0;
// while ((outLen = fosChannel.write(buffer)) ! = 0) {
// System.out.println("outLen:"+ outLen);
/ /}
fosChannel.write(buffer);
buffer.clear();
}
fosChannel.force(true);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null! = fis) { fis.close(); }if (null! = fos) { fos.close(); }if (null! = fisChannel) { fisChannel.close(); }if (null != fosChannel) {
fosChannel.close();
}
} catch(IOException e) { e.printStackTrace(); }}}}Copy the code
In fact, NIO also provides a very friendly way for us to copy files. Here’s the key code
long size = fisChannel.size();
long pos = 0;
long count = 0;
while (pos < size) {
count = size - pos > 1024 ? 1024 : size - pos;
pos += fosChannel.transferFrom(fisChannel, pos, count);
}
Copy the code
Avoids the problem of mode switching after we create the Buffer
SocketChannel
A TCP connection-oriented channel used for data transmission on the client network. Socketchannels are classified into blocking and non-blocking modes and can be configured as follows
Non-blocking mode
socketChannel.configureBlocking(false);
Copy the code
The execution mode and efficiency of the blocking mode are the same as that of the Socket under standard I/O, so the blocking mode is not set
So, how do we get an instance of SocketChannel
For instance
socketChannel=SocketChannel.open();
// Non-blocking mode
socketChannel.configureBlocking(false);
Copy the code
Connect to the server
public static final String HOST="127.0.0.1";
public static final int PORT=36589;
socketChannel.connect(new InetSocketAddress(HOST,PORT));
while(! socketChannel.finishConnect()){}Copy the code
Connecting to the server is easy with the connect() method, but in non-blocking mode, the client connects to the server and immediately returns the connection result, regardless of whether the connection was successful or not, so you need to use the spin method to determine whether the socketChannel is actually connected to the server
Operational data
Once connected to the server, it is easy to manipulate the data in the Buffer process, which is not covered here, and then through the full example
Gracefully close channels
Before closing a SocketChannel, you are advised to send an end flag to the server and then close it
socketChannel.shutdownOutput();
socketChannel.close();
Copy the code
ServerSocketChannel
Server channel: connection-oriented channel used for the server network. It is responsible for connection listening. Like SocketChannel, it is divided into blocking and non-blocking modes and configured in the same way
Non-blocking mode
server.configureBlocking(false);
Copy the code
For instance
server=ServerSocketChannel.open();
server.configureBlocking(false);
Copy the code
Binding port
server.bind(new InetSocketAddress(36589));
Copy the code
Close the channel
server.close();
Copy the code
The rest of the data manipulation process is the same as SocketChannel, and to really implement a full demo of a ServerSocketChannel, you need to use it with a Selector
DatagramChannel
UDP connectionless transport protocol datagram channel, divided into blocking and non-blocking mode, configuration mode
Non-blocking mode
open.configureBlocking(false);
Copy the code
For instance
open=DatagramChannel.open();
open.configureBlocking(false);
Copy the code
So without further ado, standard writing
Binding port
open.bind(new InetSocketAddress(52485));
Copy the code
Read the data
Instead of reading data through the read() method:
open.receive(buffer);
Copy the code
To send data
Instead of writing (), data is sent:
open.send(buffer,newInetSocketAddress())
Copy the code
Second parameter: the client you want to send to
Comprehensive example: mutual teasing between clients
Let me give you a little example of how to use DatagramChannel
public class DatagramOpenChannel {
public static void main(String[] args) {
int port = getPort();
datagram_open_channel(port);
}
private static int getPort(a) {
System.out.println(Please enter the port number you want to bind:);
Scanner scanner = new Scanner(System.in);
return scanner.nextInt();
}
private static void datagram_open_channel(int port) {
DatagramChannel open = null;
try {
open = DatagramChannel.open();
open.configureBlocking(false);
open.bind(new InetSocketAddress(port));
read(open);
send(open);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null! = open) {try {
open.close();
} catch(IOException e) { e.printStackTrace(); }}}}private static void send(DatagramChannel open) throws IOException {
System.out.println("Input content format: port@msg");
Scanner scanner = new Scanner(System.in);
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (scanner.hasNext()) {
String next = scanner.next();
if (next.contains("@")) {
String[] split = next.split("@");
int port = Integer.parseInt(split[0]);
String msg = split[1];
buffer.put(msg.getBytes());
buffer.flip();
open.send(buffer, new InetSocketAddress("127.0.0.1", port)); buffer.clear(); }}}private static void read(DatagramChannel open) throws IOException {
new Thread(() -> {
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
try {
SocketAddress receive = open.receive(buffer);
if (null! = receive) { buffer.flip(); System.out.println(new String(buffer.array(), 0, buffer.limit())); buffer.clear(); }}catch(IOException e) { e.printStackTrace(); } } }).start(); }}Copy the code
Test method: Enable two clients at the same time and send data in the specified format
It is only a small case, many places did not make judgment, we can improve on this basis
Selector
Selectors: are important players in NIO components, so what are selectors?
concept
As we said earlier, NIO mode is IO multiplexing model, and the Selector is to complete THE MULTIPLEXing of IO, Selector in which is to query the IO ready state, through the Selector can simultaneously monitor the IO state of multiple channels.
Note that selectors only work with non-blocking channels, so FileChannel is not appropriate
IO Event type
A ready state of an I/O operation on a channel, indicating that the channel is ready to complete an I/O operation, which conforms to the conditions of the IO multiplexing model
OP_READ
A channel that has data to read and is in a read-ready state
OP_WRITE
A channel waiting for data to be written, in a write-ready state
OP_CONNECT
A channel that has completed a handshake connection with the peer and is in the connection ready state
OP_ACCEPT
A server channel that listens for the arrival of a new connection and is in the receive ready state
Selector use
Gets the selector instance
selector=Selector.open();
Copy the code
Register the channel with the selector
// Register the selector and bind the receive ready state
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
Copy the code
We need to be careful,
-
The channel for registering the selector must be in non-blocking mode
-
Not all channels support the four IO events, for example:
- The server channel only supports OP_ACCEPT, but the client channel does not support OP_ACCEPT
Polls the IO ready events of interest
while (selector.select() > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
System.out.println("There's a new link coming in");
} else if (key.isConnectable()) {
System.out.println("Connection ready");
} else if (key.isReadable()) {
System.out.println("To read");
} else if (key.isWritable()) {
System.out.println("Write"); }}}Copy the code
Key points:
- After the current I/O event is obtained, it must be removed to avoid repeated processing of the current event
Perfect the instance
Example in DatagramChannel
Let’s modify that example:
- Binding selector
Selector selector=Selector.open();
//DatagramChannel is connectionless, so my directly bound read is ready
open.register(selector,SelectionKey.OP_READ);
Copy the code
- Poll IO ready events of interest, main retrofit
read()
methods
private static void read(Selector selector) throws IOException {
new Thread(() -> {
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
while (selector.select() > 0) {
Set<SelectionKey> keys =
selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
DatagramChannel open = (DatagramChannel) key.channel();
SocketAddress receive = open.receive(buffer);
if (null! = receive) { buffer.flip(); System.out.println(new String(buffer.array(), 0, buffer.limit()));
buffer.clear();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
Copy the code
File transfer function from client to server
SocketChannel and ServerSocketChannel are not shown above, so we will focus on them
The chestnuts are complicated, please digest them well
First, let’s talk about requirements:
The client uploads the selected file to the server and saves the file to the specified folder on the server
Ready to operate
class NioSocket {
static final String HOST = "127.0.0.1";
static final int PORT = 23356;
static final int BUFFER_CAPACITY = 1024;
static final Charset CHARSET = StandardCharsets.UTF_8;
}
// For simplicity
class ReceiverFile {
public String fileName;
public long length;
public FileChannel outChannel;
}
Copy the code
The client
class SocketDemo {
private static String UPLOAD_FILE = "";
public static void main(String[] args) {
send_file();
}
private static void send_file(a) {
changeUploadFile();
File file = new File(UPLOAD_FILE);
if(! file.exists()) { System.out.println("File does not exist");
return;
}
try {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(
NioSocket.HOST,
NioSocket.PORT
));
while(! socketChannel.finishConnect()) {// In asynchronous mode, the spin verifies that the connection to the server has been successful
// There are other things you can do here
}
System.out.println("Successfully connected to the server");
ByteBuffer buffer = ByteBuffer.allocate(NioSocket.BUFFER_CAPACITY);
ByteBuffer encode = NioSocket.CHARSET.encode(file.getName());
// Send file name length
Encode.capacity () is two bytes longer than encode.capacity()
buffer.putInt(file.getName().trim().length());
// buffer.flip();
// socketChannel.write(buffer);
// buffer.clear();
System.out.printf("File name length send: %s \n" , encode.capacity());
// Send the file name
buffer.put(encode);
// socketChannel.write(encode);
System.out.printf("File name send: %s \n", file.getName());
// Send file size
buffer.putLong(file.length());
// buffer.flip();
// socketChannel.write(buffer);
// buffer.clear();
System.out.printf("Send file length: %s \n", file.length());
// Send the file
int len = 0;
long progess = 0;
FileChannel fileChannel = new FileInputStream(file).getChannel();
while ((len = fileChannel.read(buffer)) > 0) {
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
progess += len;
System.out.println("Upload progress:" + (progess / file.length() * 100) + "%");
}
// Send complete, general close operation
if (len == -1) {
// Send complete, close operationfileChannel.close(); socketChannel.shutdownOutput(); socketChannel.close(); }}catch(IOException e) { e.printStackTrace(); }}private static void changeUploadFile(a) {
System.out.println("Please enter the full path to the file you want to upload.");
Scanner scanner = newScanner(System.in); UPLOAD_FILE = scanner.next(); }}Copy the code
What I commented out was a problem I encountered during testing:
If the message is sent separately, the server may have the following problems
Exception in thread "main" java.nio.BufferUnderflowException
Copy the code
The service side
class ServerSocketDemo {
private static String UPLOAD_SAVE_PATH = "D:\\works\\111";
private static final Map<SelectableChannel, ReceiverFile> MAP = new ConcurrentHashMap<>();
public static void main(String[] args) {
receive_file();
}
private static void receive_file(a) {
getUploadSavePath();
// Write on the server side
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// Bind ports
serverSocketChannel.bind(
new InetSocketAddress(
NioSocket.PORT
)
);
// Bind selector
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
/ / training in rotation
while (selector.select() > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
// Determine the event
if (key.isAcceptable()) {
accept(key, selector);
} else if (key.isReadable()) {
processData(key);
}
}
}
selector.close();
serverSocketChannel.close();
} catch(IOException e) { e.printStackTrace(); }}private static void processData(SelectionKey key) throws IOException {
ReceiverFile receiverFile = MAP.get(key.channel());
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(NioSocket.BUFFER_CAPACITY);
int len = 0;
while ((len = socketChannel.read(buffer)) > 0) {
buffer.flip();
if (receiverFile.fileName == null) {
// Process the file name
if (buffer.capacity() < 4) {
continue;
}
int fileNameLength = buffer.getInt();
byte[] fileNameArr = new byte[fileNameLength];
buffer.get(fileNameArr);
String fileName = new String(fileNameArr, NioSocket.CHARSET);
System.out.println("File Name:" + fileName);
receiverFile.fileName = fileName;
// Process the stored file
File dir = new File(UPLOAD_SAVE_PATH);
if(! dir.exists()) { dir.mkdir(); } File file =new File((UPLOAD_SAVE_PATH + File.separator + fileName).trim());
if(! file.exists()) { file.createNewFile(); } receiverFile.outChannel =new FileOutputStream(file).getChannel();
/ / the length
if (buffer.capacity() < 8) {
continue;
}
long fileLength = buffer.getLong();
System.out.println("File size:" + fileLength);
receiverFile.length = fileLength;
// File contents
if (buffer.capacity() < 0) {
continue;
}
receiverFile.outChannel.write(buffer);
} else {
// File contents
receiverFile.outChannel.write(buffer);
}
buffer.clear();
}
if (len == -1) { receiverFile.outChannel.close(); }}private static void accept(SelectionKey key, Selector selector) throws IOException {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel accept = channel.accept();
accept.configureBlocking(false);
accept.register(selector, SelectionKey.OP_READ);
// The channel matches the File
ReceiverFile receiverFile = new ReceiverFile();
MAP.put(accept, receiverFile);
}
private static void getUploadSavePath(a) {
System.out.println("Please enter the key that you want to save the file:");
Scanner scanner = newScanner(System.in); UPLOAD_SAVE_PATH = scanner.next(); }}Copy the code
Well, the knowledge point of NIO is over. It can be seen that although there are many knowledge points, it is not very complicated. It is easy to master the knowledge point by point according to the above knowledge point