Remote communication — Buffer
Objective: To introduce the implementation logic of Buffer and the source code parsing in dubo-remoting-API Buffer package.
preface
The cache is very important in the NIO framework as a byte container, and each NIO framework has its own design implementation. For example, Java NIO has a ByteBuffer design, Mina has an IoBuffer design and Netty4 has a ByteBuf design. So, in this article, I will cover some of the interface definitions that Dubbo has done for buffers, and different frameworks have been made to implement common logic for buffers. Here is the class diagram for this article:
I’m going to go through the class diagram one by one, ignoring the Test class.
Source code analysis
(a) ChannelBuffer
The Comparable interface is a cousin of the Comparable interface. Io.net ty.buffer.ByteBuf has the same method definition and design as ByteBuf. So I won’t go into details.
(2) AbstractChannelBuffer
This class implements the ChannelBuffer interface and is an abstract class of channel cache. It implements all ChannelBuffer methods, but the methods it implements are methods that need to be overridden, and the specific implementation needs to be implemented by subclasses. Now let’s make up for the principle of channel caching, which is inseparable from netty’s ByteBuf principle. AbstractChannelBuffer maintains two indexes, one for reading and one for writing. When you read from the channel cache, readerIndex will increment the number of bytes that have been read, and writerIndex will increment when you write.
/** * read index */
private int readerIndex;
/** * write index */
private int writerIndex;
/** * mark read index */
private int markedReaderIndex;
/** * write index */
private int markedWriterIndex;
Copy the code
You can see that the class has four attributes. The read index and write index are used as I described above. The read index and write index start at index position 0. While the mark read index and mark write index are used for backup rollback. When the buffer is read or written, the previous operation may need to be rolled back, so we need to back up the current read or write index to the corresponding mark index.
Other methods of this class are using four attributes to operate, nothing more than to check whether there is data to read or whether there is space to write methods, do some preconditions check and index Settings, the specific implementation is the need to subclass to achieve, so I will not post code, because the logic is relatively simple.
(3) DynamicChannelBuffer
This class inherits from the AbstractChannelBuffer class, which is a dynamic channel buffer class. That is, this class generates buffers dynamically from the ChannelBufferFactory factory, the default factory used is HeapChannelBufferFactory.
Properties and constructors
/** * channel cache factory */
private final ChannelBufferFactory factory;
/** * channel cache */
private ChannelBuffer buffer;
public DynamicChannelBuffer(int estimatedLength) {
// The default is HeapChannelBufferFactory
this(estimatedLength, HeapChannelBufferFactory.getInstance());
}
public DynamicChannelBuffer(int estimatedLength, ChannelBufferFactory factory) {
// Throw an exception if the expected length is less than 0
if (estimatedLength < 0) {
throw new IllegalArgumentException("estimatedLength: " + estimatedLength);
}
// If the factory is empty, a null pointer exception is thrown
if (factory == null) {
throw new NullPointerException("factory");
}
// Set the factory
this.factory = factory;
// Create a cache
buffer = factory.getBuffer(estimatedLength);
}
Copy the code
As you can see, the class has two properties, all of which are implemented by calling the buffer method, but the buffer generation is dynamically generated by the factory. And from the constructor point of view, HeapChannelBufferFactory is used by default.
2.ensureWritableBytes
@Override
public void ensureWritableBytes(int minWritableBytes) {
// If the minimum number of bytes written is not greater than the number of bytes that can be written, end
if (minWritableBytes <= writableBytes()) {
return;
}
// Add capacity
int newCapacity;
// The number of bytes this buffer can contain is equal to 0.
if (capacity() == 0) {
// Set the new capacity to 1
newCapacity = 1;
} else {
// The new capacity is set to the number of bytes that can be contained in the buffer
newCapacity = capacity();
}
// Minimum new capacity = Current write index + Minimum number of bytes written
int minNewCapacity = writerIndex() + minWritableBytes;
// When the new capacity is smaller than the minimum new capacity
while (newCapacity < minNewCapacity) {
// The new capacity is moved 1 bit to the left, i.e. doubled
newCapacity <<= 1;
}
// Create the capacity size by factory as buffer
ChannelBuffer newBuffer = factory().getBuffer(newCapacity);
// Read data from buffer into newBuffer
newBuffer.writeBytes(buffer, 0, writerIndex());
// Replace the original to the buffer
buffer = newBuffer;
}
Copy the code
This method is to ensure that the array has a capacity of writable, the method is to rewrite the parent class method, by passing in a minimum number of bytes written, to increase the buffer, you can see, when the existing buffer is not large enough, will be to double buffer capacity, until the buffer is larger than the size of the incoming number minimum writing section.
3.copy
@Override
public ChannelBuffer copy(int index, int length) {
// Create a buffer with an estimated minimum length of 64 or more
DynamicChannelBuffer copiedBuffer = new DynamicChannelBuffer(Math.max(length, 64), factory());
// Copy data
copiedBuffer.buffer = buffer.copy(index, length);
// Set the read index to 0 and the write index to the data length of copy
copiedBuffer.setIndex(0, length);
// return the cache
return copiedBuffer;
}
Copy the code
The method is to copy the data, create a buffer with an estimated minimum length of 64, and then reset the read index write index.
All other methods call buffer’s methods or the parent class’s methods, so I won’t go into that here.
(4) ByteBufferBackedChannelBuffer
This method inherits AbstractChannelBuffer, which is based on Java NIO ByteBuffer to implement related operations such as reading and writing data.
/** * ByteBuffer instance */
private final ByteBuffer buffer;
/** * Capacity */
private final int capacity;
public ByteBufferBackedChannelBuffer(ByteBuffer buffer) {
if (buffer == null) {
throw new NullPointerException("buffer");
}
// Create a new byte buffer, the size of which will be the remaining capacity of this buffer
this.buffer = buffer.slice();
// Returns the remaining capacity of the buffer
capacity = buffer.remaining();
// Set write index
writerIndex(capacity);
}
Copy the code
These are the properties and constructors of this class. You can see that it has an instance of type ByteBuffer, and capacity is the remaining capacity of the buffer.
There are other methods, such as getByte, which reads data from buffer, setBytes, which writes data to buffer. There are many overloaded methods, so I won’t go through all of them, but they call some of the ByteBuffer methods. If you are not familiar with the ByteBuffer method in Java NIO, you need to first understand the ByteBuffer in Java NIO.
(5) HeapChannelBuffer
This method inherits AbstractChannelBuffer, which is implemented based on byte arrays
/** * The underlying heap byte array that this buffer is wrapped. * /
protected final byte[] array;
/** * Creates a new heap buffer with a newly allocated byte array. * *@param length the length of the new byte array
*/
public HeapChannelBuffer(int length) {
this(new byte[length], 0.0);
}
/** * Creates a new heap buffer with an existing byte array * *@param array the byte array to wrap
*/
public HeapChannelBuffer(byte[] array) {
this(array, 0, array.length);
}
/** * Creates a new heap buffer with an existing byte array * *@param array the byte array to wrap
* @param readerIndex the initial reader index of this buffer
* @param writerIndex the initial writer index of this buffer
*/
protected HeapChannelBuffer(byte[] array, int readerIndex, int writerIndex) {
if (array == null) {
throw new NullPointerException("array");
}
this.array = array;
setIndex(readerIndex, writerIndex);
}
Copy the code
This class has several constructors, all of which are byte array-based, meaning that the class wraps a byte array and passes the byte array passed in by the constructor to the property. Other approaches have simpler logic.
(6) ChannelBufferFactory
public interface ChannelBufferFactory {
/** * get the buffer instance *@param capacity
* @return* /
ChannelBuffer getBuffer(int capacity);
ChannelBuffer getBuffer(byte[] array, int offset, int length);
ChannelBuffer getBuffer(ByteBuffer nioBuffer);
}
Copy the code
This interface is a channel buffer factory, where only the methods to get the channel buffer are defined, which is easier to understand, and it has two implementation classes, which I’ll talk about later.
(7) HeapChannelBufferFactory
This class implements the ChannelBufferFactory, which is a factory for creating buffers based on byte arrays.
public class HeapChannelBufferFactory implements ChannelBufferFactory {
/** * singleton */
private static final HeapChannelBufferFactory INSTANCE = new HeapChannelBufferFactory();
public HeapChannelBufferFactory(a) {
super(a); }public static ChannelBufferFactory getInstance(a) {
return INSTANCE;
}
@Override
public ChannelBuffer getBuffer(int capacity) {
// Create a buffer of capacity
return ChannelBuffers.buffer(capacity);
}
@Override
public ChannelBuffer getBuffer(byte[] array, int offset, int length) {
return ChannelBuffers.wrappedBuffer(array, offset, length);
}
@Override
public ChannelBuffer getBuffer(ByteBuffer nioBuffer) {
// Check whether the buffer is supported by byte arrays
if (nioBuffer.hasArray()) {
/ / use
return ChannelBuffers.wrappedBuffer(nioBuffer);
}
// Create a buffer with the remaining nioBuffer capacity
ChannelBuffer buf = getBuffer(nioBuffer.remaining());
// Record the nioBuffer position
int pos = nioBuffer.position();
// Write data to buF
buf.writeBytes(nioBuffer);
// Reset nioBuffer to pos
nioBuffer.position(pos);
returnbuf; }}Copy the code
This class makes use of the singleton pattern, in which a simple method is called in ChannelBuffers, which actually uses the buffer-creating method in HeapChannelBuffer.
DirectChannelBufferFactory (eight)
This class implements the ChannelBufferFactory interface, which is the direct buffer factory used to create direct buffers.
public class DirectChannelBufferFactory implements ChannelBufferFactory {
/** * singleton */
private static final DirectChannelBufferFactory INSTANCE = new DirectChannelBufferFactory();
public DirectChannelBufferFactory(a) {
super(a); }public static ChannelBufferFactory getInstance(a) {
return INSTANCE;
}
@Override
public ChannelBuffer getBuffer(int capacity) {
if (capacity < 0) {
throw new IllegalArgumentException("capacity: " + capacity);
}
if (capacity == 0) {
return ChannelBuffers.EMPTY_BUFFER;
}
// Generate a direct buffer
return ChannelBuffers.directBuffer(capacity);
}
@Override
public ChannelBuffer getBuffer(byte[] array, int offset, int length) {
if (array == null) {
throw new NullPointerException("array");
}
if (offset < 0) {
throw new IndexOutOfBoundsException("offset: " + offset);
}
if (length == 0) {
return ChannelBuffers.EMPTY_BUFFER;
}
if (offset + length > array.length) {
throw new IndexOutOfBoundsException("length: " + length);
}
ChannelBuffer buf = getBuffer(length);
buf.writeBytes(array, offset, length);
return buf;
}
@Override
public ChannelBuffer getBuffer(ByteBuffer nioBuffer) {
// If nioBuffer is not read-only and it is a direct buffer
if(! nioBuffer.isReadOnly() && nioBuffer.isDirect()) {// Create a buffer
return ChannelBuffers.wrappedBuffer(nioBuffer);
}
// Create a buffer with the remaining nioBuffer capacity
ChannelBuffer buf = getBuffer(nioBuffer.remaining());
// Record the nioBuffer position
int pos = nioBuffer.position();
// Write data to buF
buf.writeBytes(nioBuffer);
// Reset nioBuffer to pos
nioBuffer.position(pos);
return buf;
}
Copy the code
The implementation in this class is similar to the implementation in HeapChannelBufferFactory, except that it creates a direct buffer.
ChannelBuffers (9)
This class is a utility class for buffers, providing common methods for creating and comparing ChannelBuffers. Here are two ways to do it:
public static ChannelBuffer wrappedBuffer(ByteBuffer buffer) {
// If the buffer has no free capacity
if(! buffer.hasRemaining()) {return EMPTY_BUFFER;
}
// If it is a byte array generated buffer
if (buffer.hasArray()) {
// Generate a new buffer using the byte array of buffer
return wrappedBuffer(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
} else {
// Create a buffer based on ByteBuffer.
return newByteBufferBackedChannelBuffer(buffer); }}Copy the code
This method uses buffer to create a new buffer. We can see that the methods of the above three classes are called. Many methods of ChannelBuffers are implemented this way, and the logic is relatively simple.
public static boolean equals(ChannelBuffer bufferA, ChannelBuffer bufferB) {
// Get bufferA's readable data
final int aLen = bufferA.readableBytes();
// If two buffers have different sizes of readable data, they are not the same
if(aLen ! = bufferB.readableBytes()) {return false;
}
final int byteCount = aLen & 7;
// Get the read index of the two comparing buffers
int aIndex = bufferA.readerIndex();
int bIndex = bufferB.readerIndex();
// Compare up to 7 data in the buffer
for (int i = byteCount; i > 0; i--) {
// If one data is different, it is not the same
if(bufferA.getByte(aIndex) ! = bufferB.getByte(bIndex)) {return false;
}
aIndex++;
bIndex++;
}
return true;
}
Copy the code
This method compares whether two buffers are the same, overriding equals.
ChannelBufferOutputStream (10)
This class inherits OutputStream
Properties and constructors
/** * buffer */
private final ChannelBuffer buffer;
/** * Records the index to which the write starts */
private final int startIndex;
public ChannelBufferOutputStream(ChannelBuffer buffer) {
if (buffer == null) {
throw new NullPointerException("buffer");
}
this.buffer = buffer;
// Write down the index where the data was started
startIndex = buffer.writerIndex();
}
Copy the code
This class wraps a buffer object and startIndex, which is the index to which records are written.
2.writtenBytes
public int writtenBytes(a) {
return buffer.writerIndex() - startIndex;
}
Copy the code
This method returns how much data was written.
This class also contains the write method, which calls buffer.writeBytes.
(11) ChannelBufferInputStream
This class inherits from InputStream
Properties and constructors
/** * buffer */
private final ChannelBuffer buffer;
/** * Records the index of the start read data */
private final int startIndex;
/** * Finish reading index */
private final int endIndex;
public ChannelBufferInputStream(ChannelBuffer buffer) {
this(buffer, buffer.readableBytes());
}
public ChannelBufferInputStream(ChannelBuffer buffer, int length) {
if (buffer == null) {
throw new NullPointerException("buffer");
}
if (length < 0) {
throw new IllegalArgumentException("length: " + length);
}
if (length > buffer.readableBytes()) {
throw new IndexOutOfBoundsException();
}
this.buffer = buffer;
// Record the index where the data is started
startIndex = buffer.readerIndex();
// Set the index to end reading data
endIndex = startIndex + length;
// mark read index
buffer.markReaderIndex();
}
Copy the code
This class wraps the read start index and end index, and initializes these properties in the constructor.
2.readBytes
public int readBytes(a) {
return buffer.readerIndex() - startIndex;
}
Copy the code
This method returns how much data was read.
3.available
@Override
public int available(a) throws IOException {
return endIndex - buffer.readerIndex();
}
Copy the code
This method returns how much data is left unread
4.read
@Override
public int read(a) throws IOException {
if(! buffer.readable()) {return -1;
}
return buffer.readByte() & 0xff;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
// Determine whether there is still data to read
int available = available();
if (available == 0) {
return -1;
}
// Get the length of data to read
len = Math.min(available, len);
buffer.readBytes(b, off, len);
return len;
}
Copy the code
The method is read data and returns the length of the read data.
5.skip
@Override
public long skip(long n) throws IOException {
if (n > Integer.MAX_VALUE) {
return skipBytes(Integer.MAX_VALUE);
} else {
return skipBytes((int) n); }}private int skipBytes(int n) throws IOException {
int nBytes = Math.min(available(), n);
// Skip some data
buffer.skipBytes(nBytes);
return nBytes;
}
Copy the code
This method skips n lengths to read data.
Afterword.
The source code for this section is github.com/CrazyHZM/in…
Note that AbstractChannelBuffer has three subclasses, i.e., three forms of Buffer generation. Also note the two factories that create buffer instances. I’ll cover the Telnet section in the next article. If I didn’t write enough or made mistakes in any part, please give me your advice.