Remote communication — Buffer

Objective: To introduce the implementation logic of Buffer and the source code parsing in dubo-remoting-API Buffer package.

preface

The cache is very important in the NIO framework as a byte container, and each NIO framework has its own design implementation. For example, Java NIO has a ByteBuffer design, Mina has an IoBuffer design and Netty4 has a ByteBuf design. So, in this article, I will cover some of the interface definitions that Dubbo has done for buffers, and different frameworks have been made to implement common logic for buffers. Here is the class diagram for this article:

I’m going to go through the class diagram one by one, ignoring the Test class.

Source code analysis

(a) ChannelBuffer

The Comparable interface is a cousin of the Comparable interface. Io.net ty.buffer.ByteBuf has the same method definition and design as ByteBuf. So I won’t go into details.

(2) AbstractChannelBuffer

This class implements the ChannelBuffer interface and is an abstract class of channel cache. It implements all ChannelBuffer methods, but the methods it implements are methods that need to be overridden, and the specific implementation needs to be implemented by subclasses. Now let’s make up for the principle of channel caching, which is inseparable from netty’s ByteBuf principle. AbstractChannelBuffer maintains two indexes, one for reading and one for writing. When you read from the channel cache, readerIndex will increment the number of bytes that have been read, and writerIndex will increment when you write.

/** * read index */
private int readerIndex;

/** * write index */
private int writerIndex;

/** * mark read index */
private int markedReaderIndex;

/** * write index */
private int markedWriterIndex;
Copy the code

You can see that the class has four attributes. The read index and write index are used as I described above. The read index and write index start at index position 0. While the mark read index and mark write index are used for backup rollback. When the buffer is read or written, the previous operation may need to be rolled back, so we need to back up the current read or write index to the corresponding mark index.

Other methods of this class are using four attributes to operate, nothing more than to check whether there is data to read or whether there is space to write methods, do some preconditions check and index Settings, the specific implementation is the need to subclass to achieve, so I will not post code, because the logic is relatively simple.

(3) DynamicChannelBuffer

This class inherits from the AbstractChannelBuffer class, which is a dynamic channel buffer class. That is, this class generates buffers dynamically from the ChannelBufferFactory factory, the default factory used is HeapChannelBufferFactory.

Properties and constructors

/** * channel cache factory */
private final ChannelBufferFactory factory;

/** * channel cache */
private ChannelBuffer buffer;

public DynamicChannelBuffer(int estimatedLength) {
    // The default is HeapChannelBufferFactory
    this(estimatedLength, HeapChannelBufferFactory.getInstance());
}

public DynamicChannelBuffer(int estimatedLength, ChannelBufferFactory factory) {
    // Throw an exception if the expected length is less than 0
    if (estimatedLength < 0) {
        throw new IllegalArgumentException("estimatedLength: " + estimatedLength);
    }
    // If the factory is empty, a null pointer exception is thrown
    if (factory == null) {
        throw new NullPointerException("factory");
    }
    // Set the factory
    this.factory = factory;
    // Create a cache
    buffer = factory.getBuffer(estimatedLength);
}
Copy the code

As you can see, the class has two properties, all of which are implemented by calling the buffer method, but the buffer generation is dynamically generated by the factory. And from the constructor point of view, HeapChannelBufferFactory is used by default.

2.ensureWritableBytes

@Override
public void ensureWritableBytes(int minWritableBytes) {
    // If the minimum number of bytes written is not greater than the number of bytes that can be written, end
    if (minWritableBytes <= writableBytes()) {
        return;
    }

    // Add capacity
    int newCapacity;
    // The number of bytes this buffer can contain is equal to 0.
    if (capacity() == 0) {
        // Set the new capacity to 1
        newCapacity = 1;
    } else {
        // The new capacity is set to the number of bytes that can be contained in the buffer
        newCapacity = capacity();
    }
    // Minimum new capacity = Current write index + Minimum number of bytes written
    int minNewCapacity = writerIndex() + minWritableBytes;
    // When the new capacity is smaller than the minimum new capacity
    while (newCapacity < minNewCapacity) {
        // The new capacity is moved 1 bit to the left, i.e. doubled
        newCapacity <<= 1;
    }

    // Create the capacity size by factory as buffer
    ChannelBuffer newBuffer = factory().getBuffer(newCapacity);
    // Read data from buffer into newBuffer
    newBuffer.writeBytes(buffer, 0, writerIndex());
    // Replace the original to the buffer
    buffer = newBuffer;
}
Copy the code

This method is to ensure that the array has a capacity of writable, the method is to rewrite the parent class method, by passing in a minimum number of bytes written, to increase the buffer, you can see, when the existing buffer is not large enough, will be to double buffer capacity, until the buffer is larger than the size of the incoming number minimum writing section.

3.copy

@Override
public ChannelBuffer copy(int index, int length) {
    // Create a buffer with an estimated minimum length of 64 or more
    DynamicChannelBuffer copiedBuffer = new DynamicChannelBuffer(Math.max(length, 64), factory());
    // Copy data
    copiedBuffer.buffer = buffer.copy(index, length);
    // Set the read index to 0 and the write index to the data length of copy
    copiedBuffer.setIndex(0, length);
    // return the cache
    return copiedBuffer;
}
Copy the code

The method is to copy the data, create a buffer with an estimated minimum length of 64, and then reset the read index write index.

All other methods call buffer’s methods or the parent class’s methods, so I won’t go into that here.

(4) ByteBufferBackedChannelBuffer

This method inherits AbstractChannelBuffer, which is based on Java NIO ByteBuffer to implement related operations such as reading and writing data.

/** * ByteBuffer instance */
private final ByteBuffer buffer;

/** * Capacity */
private final int capacity;

public ByteBufferBackedChannelBuffer(ByteBuffer buffer) {
    if (buffer == null) {
        throw new NullPointerException("buffer");
    }

    // Create a new byte buffer, the size of which will be the remaining capacity of this buffer
    this.buffer = buffer.slice();
    // Returns the remaining capacity of the buffer
    capacity = buffer.remaining();
    // Set write index
    writerIndex(capacity);
}
Copy the code

These are the properties and constructors of this class. You can see that it has an instance of type ByteBuffer, and capacity is the remaining capacity of the buffer.

There are other methods, such as getByte, which reads data from buffer, setBytes, which writes data to buffer. There are many overloaded methods, so I won’t go through all of them, but they call some of the ByteBuffer methods. If you are not familiar with the ByteBuffer method in Java NIO, you need to first understand the ByteBuffer in Java NIO.

(5) HeapChannelBuffer

This method inherits AbstractChannelBuffer, which is implemented based on byte arrays

/** * The underlying heap byte array that this buffer is wrapped. * /
protected final byte[] array;

/** * Creates a new heap buffer with a newly allocated byte array. * *@param length the length of the new byte array
 */
public HeapChannelBuffer(int length) {
    this(new byte[length], 0.0);
}

/** * Creates a new heap buffer with an existing byte array * *@param array the byte array to wrap
 */
public HeapChannelBuffer(byte[] array) {
    this(array, 0, array.length);
}

/** * Creates a new heap buffer with an existing byte array * *@param array       the byte array to wrap
 * @param readerIndex the initial reader index of this buffer
 * @param writerIndex the initial writer index of this buffer
 */
protected HeapChannelBuffer(byte[] array, int readerIndex, int writerIndex) {
    if (array == null) {
        throw new NullPointerException("array");
    }
    this.array = array;
    setIndex(readerIndex, writerIndex);
}
Copy the code

This class has several constructors, all of which are byte array-based, meaning that the class wraps a byte array and passes the byte array passed in by the constructor to the property. Other approaches have simpler logic.

(6) ChannelBufferFactory

public interface ChannelBufferFactory {

    /** * get the buffer instance *@param capacity
     * @return* /
    ChannelBuffer getBuffer(int capacity);

    ChannelBuffer getBuffer(byte[] array, int offset, int length);

    ChannelBuffer getBuffer(ByteBuffer nioBuffer);

}
Copy the code

This interface is a channel buffer factory, where only the methods to get the channel buffer are defined, which is easier to understand, and it has two implementation classes, which I’ll talk about later.

(7) HeapChannelBufferFactory

This class implements the ChannelBufferFactory, which is a factory for creating buffers based on byte arrays.

public class HeapChannelBufferFactory implements ChannelBufferFactory {

    /** * singleton */
    private static final HeapChannelBufferFactory INSTANCE = new HeapChannelBufferFactory();

    public HeapChannelBufferFactory(a) {
        super(a); }public static ChannelBufferFactory getInstance(a) {
        return INSTANCE;
    }

    @Override
    public ChannelBuffer getBuffer(int capacity) {
        // Create a buffer of capacity
        return ChannelBuffers.buffer(capacity);
    }

    @Override
    public ChannelBuffer getBuffer(byte[] array, int offset, int length) {
        return ChannelBuffers.wrappedBuffer(array, offset, length);
    }

    @Override
    public ChannelBuffer getBuffer(ByteBuffer nioBuffer) {
        // Check whether the buffer is supported by byte arrays
        if (nioBuffer.hasArray()) {
            / / use
            return ChannelBuffers.wrappedBuffer(nioBuffer);
        }

        // Create a buffer with the remaining nioBuffer capacity
        ChannelBuffer buf = getBuffer(nioBuffer.remaining());
        // Record the nioBuffer position
        int pos = nioBuffer.position();
        // Write data to buF
        buf.writeBytes(nioBuffer);
        // Reset nioBuffer to pos
        nioBuffer.position(pos);
        returnbuf; }}Copy the code

This class makes use of the singleton pattern, in which a simple method is called in ChannelBuffers, which actually uses the buffer-creating method in HeapChannelBuffer.

DirectChannelBufferFactory (eight)

This class implements the ChannelBufferFactory interface, which is the direct buffer factory used to create direct buffers.

public class DirectChannelBufferFactory implements ChannelBufferFactory {

    /** * singleton */
    private static final DirectChannelBufferFactory INSTANCE = new DirectChannelBufferFactory();

    public DirectChannelBufferFactory(a) {
        super(a); }public static ChannelBufferFactory getInstance(a) {
        return INSTANCE;
    }

    @Override
    public ChannelBuffer getBuffer(int capacity) {
        if (capacity < 0) {
            throw new IllegalArgumentException("capacity: " + capacity);
        }
        if (capacity == 0) {
            return ChannelBuffers.EMPTY_BUFFER;
        }
        // Generate a direct buffer
        return ChannelBuffers.directBuffer(capacity);
    }

    @Override
    public ChannelBuffer getBuffer(byte[] array, int offset, int length) {
        if (array == null) {
            throw new NullPointerException("array");
        }
        if (offset < 0) {
            throw new IndexOutOfBoundsException("offset: " + offset);
        }
        if (length == 0) {
            return ChannelBuffers.EMPTY_BUFFER;
        }
        if (offset + length > array.length) {
            throw new IndexOutOfBoundsException("length: " + length);
        }

        ChannelBuffer buf = getBuffer(length);
        buf.writeBytes(array, offset, length);
        return buf;
    }

    @Override
    public ChannelBuffer getBuffer(ByteBuffer nioBuffer) {
        // If nioBuffer is not read-only and it is a direct buffer
        if(! nioBuffer.isReadOnly() && nioBuffer.isDirect()) {// Create a buffer
            return ChannelBuffers.wrappedBuffer(nioBuffer);
        }

        // Create a buffer with the remaining nioBuffer capacity
        ChannelBuffer buf = getBuffer(nioBuffer.remaining());
        // Record the nioBuffer position
        int pos = nioBuffer.position();
        // Write data to buF
        buf.writeBytes(nioBuffer);
        // Reset nioBuffer to pos
        nioBuffer.position(pos);
        return buf;
    }
Copy the code

The implementation in this class is similar to the implementation in HeapChannelBufferFactory, except that it creates a direct buffer.

ChannelBuffers (9)

This class is a utility class for buffers, providing common methods for creating and comparing ChannelBuffers. Here are two ways to do it:

public static ChannelBuffer wrappedBuffer(ByteBuffer buffer) {
    // If the buffer has no free capacity
    if(! buffer.hasRemaining()) {return EMPTY_BUFFER;
    }
    // If it is a byte array generated buffer
    if (buffer.hasArray()) {
        // Generate a new buffer using the byte array of buffer
        return wrappedBuffer(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
    } else {
        // Create a buffer based on ByteBuffer.
        return newByteBufferBackedChannelBuffer(buffer); }}Copy the code

This method uses buffer to create a new buffer. We can see that the methods of the above three classes are called. Many methods of ChannelBuffers are implemented this way, and the logic is relatively simple.

public static boolean equals(ChannelBuffer bufferA, ChannelBuffer bufferB) {
    // Get bufferA's readable data
    final int aLen = bufferA.readableBytes();
    // If two buffers have different sizes of readable data, they are not the same
    if(aLen ! = bufferB.readableBytes()) {return false;
    }

    final int byteCount = aLen & 7;

    // Get the read index of the two comparing buffers
    int aIndex = bufferA.readerIndex();
    int bIndex = bufferB.readerIndex();

    // Compare up to 7 data in the buffer
    for (int i = byteCount; i > 0; i--) {
        // If one data is different, it is not the same
        if(bufferA.getByte(aIndex) ! = bufferB.getByte(bIndex)) {return false;
        }
        aIndex++;
        bIndex++;
    }

    return true;
}
Copy the code

This method compares whether two buffers are the same, overriding equals.

ChannelBufferOutputStream (10)

This class inherits OutputStream

Properties and constructors

/** * buffer */
private final ChannelBuffer buffer;
/** * Records the index to which the write starts */
private final int startIndex;

public ChannelBufferOutputStream(ChannelBuffer buffer) {
    if (buffer == null) {
        throw new NullPointerException("buffer");
    }
    this.buffer = buffer;
    // Write down the index where the data was started
    startIndex = buffer.writerIndex();
}
Copy the code

This class wraps a buffer object and startIndex, which is the index to which records are written.

2.writtenBytes

public int writtenBytes(a) {
    return buffer.writerIndex() - startIndex;
}
Copy the code

This method returns how much data was written.

This class also contains the write method, which calls buffer.writeBytes.

(11) ChannelBufferInputStream

This class inherits from InputStream

Properties and constructors

/** * buffer */
private final ChannelBuffer buffer;
/** * Records the index of the start read data */
private final int startIndex;
/** * Finish reading index */
private final int endIndex;

public ChannelBufferInputStream(ChannelBuffer buffer) {
    this(buffer, buffer.readableBytes());
}

public ChannelBufferInputStream(ChannelBuffer buffer, int length) {
    if (buffer == null) {
        throw new NullPointerException("buffer");
    }
    if (length < 0) {
        throw new IllegalArgumentException("length: " + length);
    }
    if (length > buffer.readableBytes()) {
        throw new IndexOutOfBoundsException();
    }

    this.buffer = buffer;
    // Record the index where the data is started
    startIndex = buffer.readerIndex();
    // Set the index to end reading data
    endIndex = startIndex + length;
    // mark read index
    buffer.markReaderIndex();
}
Copy the code

This class wraps the read start index and end index, and initializes these properties in the constructor.

2.readBytes

public int readBytes(a) {
    return buffer.readerIndex() - startIndex;
}
Copy the code

This method returns how much data was read.

3.available

@Override
public int available(a) throws IOException {
    return endIndex - buffer.readerIndex();
}
Copy the code

This method returns how much data is left unread

4.read

@Override
public int read(a) throws IOException {
    if(! buffer.readable()) {return -1;
    }
    return buffer.readByte() & 0xff;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
    // Determine whether there is still data to read
    int available = available();
    if (available == 0) {
        return -1;
    }

    // Get the length of data to read
    len = Math.min(available, len);
    buffer.readBytes(b, off, len);
    return len;
}

Copy the code

The method is read data and returns the length of the read data.

5.skip

@Override
public long skip(long n) throws IOException {
    if (n > Integer.MAX_VALUE) {
        return skipBytes(Integer.MAX_VALUE);
    } else {
        return skipBytes((int) n); }}private int skipBytes(int n) throws IOException {
    int nBytes = Math.min(available(), n);
    // Skip some data
    buffer.skipBytes(nBytes);
    return nBytes;
}

Copy the code

This method skips n lengths to read data.

Afterword.

The source code for this section is github.com/CrazyHZM/in…

Note that AbstractChannelBuffer has three subclasses, i.e., three forms of Buffer generation. Also note the two factories that create buffer instances. I’ll cover the Telnet section in the next article. If I didn’t write enough or made mistakes in any part, please give me your advice.