Original link: juejin.cn/post/684490…


The ByteBuf type of Netty

  • Pooled, Unpooled

  • Direct(Direct buffer/out of Heap), Heap(in JVM Heap)

  • Unsafe (the local method called unsafe), safe(which is also not normally said, as opposed to unsafe, which refers to operations within the JVM heap)

By default, Netty uses the Unsafe implementation in preference


Pooling/unpooling (Pooled/Unpooled)

Netty first applies for a contiguous space as a ByteBuf pool. When needed, Netty directly obtains space from the pool and returns it to the ByteBuf pool instead of applying for space every time ByteBuf needs to be used. The creation of objects outside the heap takes longer than the creation of objects inside the heap.

Summary: The purpose of pooling is to speed up the application’s retrieval of objects for operation and reduce the overhead of frequent creation/destruction


Out/in (Direct /heap)

In-heap refers to the data in the JVM. Applications and operations are performed in the JVM.

The direct out-of-heap buffer refers to the non-JVM heap memory applied by native methods during memory application. This part of memory can be directly used by OS, physically continuous data. Unlike memory in the heap, which is logically contiguous, not physically contiguous, and is likely to change addresses after gc, the underlying layer will make an error in reading the corresponding address. So the processing of the JDK is, at the time of writing, you also need to copy a to direct buffer (continuous physical address, specific words is in IOUtil# write () – > calls the Util. GetTemporaryDirectBuffer temporary pile is put in the temporary buffer). Java objects (DirectByteBuf) only handle reader/writer indexes (memory addresses, offsets, etc.). Both written data and read data are operated on data outside the heap through Native.

Conclusion:

  • Use outside the heap memory directly, it can reduce the time in pile is copied to the heap outside the temporary buffer, because in pile address is continuous, logical and network or file processing, need continuous on the physical address, if writing data to the gc, gc address may be changed, before and after OS read less than the corresponding data eventually lead to problems.

  • Out-of-heap memory is used to reduce GC stress && in some scenarios, out-of-heap copy-to-in-heap operations (such as request forwarding) can be eliminated, improving efficiency.


unsafe

Unsafe is a class in Sun. misc that allows native methods to manipulate memory directly, even more efficiently. Broadening and manipulating out-of-heap memory is also what unsafes does. However, to use unsafe, you have to be very familiar with memory operations, and otherwise it’s very error-prone, so it makes sense why it’s officially called unsafe.

Summary: Direct operation of memory, efficiency, easy to use error




Some of the classes and concepts used in Pool

PoolArena, PoolChunk, PoolThreadLocalCache, PoolSubpage, Recycler

  • PoolArena: Arena stage means, as the name implies, that operations in the pool need this class to provide the environment
  • PoolChunk: a memory block that is applied for by Netty and stores information such as chunkSize, offset, and free space freeSize. In order to find at least the requested size in the chunk, a complete binary tree is constructed, like a heap (this is a maximum heap, and the nodes in the chunk will form a complete binary tree).
  • PoolThreadLocalCache: PoolArena -> chunk (-> page-> subPage)
  • PoolSubPage: The Page on the bottom chunk
  • Recycle: An abstract class that fetches ByteBuf from a ThreadLocal

PoolThreadLocalCache and Recycle use the ThreadLocal variable to reduce multi-thread contention and improve operation efficiency.


Several important attribute values.

MaxOrder default 11: full binary tree depth (root node is layer 0, so objectively there is maxOrder+1 layer)

PageSize default 8192 (8K) : The default size of page for the lowest leaf node of the complete binary tree above

PageShifts defaults to 13: This is the log of pageSize, 2^pageShifts = pageSize, pageSize defaults to 8192, so this defaults to 13

ChunkSize defaults to 16m(pageSize * 2^maxOrder): this is the size of each chunk, which is the size of each layer in the following chunk diagram

The minimum partition unit in a page is 16 bytes. This number is important and will be used in several key calculations


ByteBuf size type:

  • size < 512 , tiny
  • 512 < size < 8192 , small
  • 8192 < size < 16m , normal
  • 16m < size , huge


The structure of the chunk

The sum of each layer is 16M, and each layer represents a chunk. When subdivided to the bottom layer, each page is 8192(8K), so the bottom layer has 2k nodes. Of course, not all of them are drawn here, and subpages are operated on the page.




Out-of-heap/in-heap ByteBuffer memory request

A simple test

Do a simple test to test the off-heap memory requisition and in-heap memory requisition time:

	static void nioAllocTest(a){
        int num = 10;
        int cnt = 100;
        int size = 256;
        ByteBuffer buf;

        long start1,end1,start2,end2;
        long sum1,sum2;
        for(int i = 0; i<num; i++){ sum1=sum2=0;
            int j;
            for(j = 0; j<cnt; j++) { start1 = System.nanoTime(); buf = ByteBuffer.allocateDirect(size); end1 = System.nanoTime(); sum1+=(end1-start1);// system.out. println("direct request time: "+(end1-start1));

                start2 = System.nanoTime();
                buf = ByteBuffer.allocate(size);
                end2 = System.nanoTime();
// system.out. println("heap application time: "+(end2-start2)); // system.out. println("heap application time: "+(end2-start2));
// System.out.println("-----");
                sum2+=(end2-start2);
            }
            System.out.println(String.format("%s round request %s times %s average time in bytes [direct: %s, heap: %s].",i,j,size,sum1/cnt, sum2/cnt)); }}Copy the code

The output is:

Heap: 1616 average time spent for 100 applications in Round 0 [Direct: 4864, heap: 1616]. Heap: 1641 average 256-byte time for 100 applications in round 1 [Direct: 5763, heap: 1641]. Round 2 application 100 times 256-byte average time [Direct: 4771, heap: 1672]. Round 3 application 100 times 256 bytes Average time [Direct: 4961, heap: 883]. Round 4:100 times 256-byte average time [Direct: 3556, heap: 870]. Round 5 application 100 times 256-byte average time [Direct: 5159, heap: 726]. Round 6 application 100 times 256-byte average time [Direct: 3739, heap: 843]. Round 7 100 applications 256 bytes Average time [Direct: 3910, heap: 221]. Round 8 100 applications 256 bytes Average time [Direct: 2191, heap: 590]. Round 9 average 256-byte time for 100 applications [Direct: 1624, heap: 615]

As you can see, direct off-heap memory takes significantly more time than JVM heap, in this case several times.


Pool/unpool

A simple test

Do a simple test to test the effect of pooling

	static void nettyPooledTest(a){
        try {
            int num = 10;
            int cnt = 100;
            int size = 8192;
            ByteBuf direct1, direct2, heap1, heap2;

            long start1, end1, start2, end2, start3, end3, start4, end4;
            long sum1, sum2, sum3, sum4;
            for (int i = 0; i<num; i++) {
                sum1 = sum2 = sum3 = sum4 = 0;
                int j;
                for (j = 0; j<cnt; j++) {

                    start1 = System.nanoTime();
                    direct1 = PooledByteBufAllocator.DEFAULT.directBuffer(size);
                    end1 = System.nanoTime();
                    sum1 += (end1-start1);

                    start2 = System.nanoTime();
                    direct2 = UnpooledByteBufAllocator.DEFAULT.directBuffer(size);
                    end2 = System.nanoTime();
                    sum2 += (end2-start2);

                    start3 = System.nanoTime();
                    heap1 = PooledByteBufAllocator.DEFAULT.heapBuffer(size);
                    end3 = System.nanoTime();
                    sum3 += (end3-start3);

                    start4 = System.nanoTime();
                    heap2 = UnpooledByteBufAllocator.DEFAULT.heapBuffer(size);
                    end4 = System.nanoTime();
                    sum4 += (end4-start4);

                    direct1.release();
                    direct2.release();
                    heap1.release();
                    heap2.release();
                }
                System.out.println(String.format(Pooled: [%s], direct.unpooled: [%s], heap.pooled: [%s], heap.unpooled: [%s] [%s]].", i, j, size, sum1/cnt, sum2/cnt, sum3/cnt, sum4/cnt)); }}catch(Exception e){
            e.printStackTrace();
        }finally{}}Copy the code

Final output:

Pooled: [1784931], direct.unpooled: [105310], heap.pooled: [105310] [202306], heap.unpooled: [23317]].netty 100 applications for the first round [8192] Average Time in bytes [direct.pooled: [12849], direct.unpooled: [15457], heap.pooled: [12671], heap.unpooled: [12693]]. Netty 100 applications for the second round [8192] Average time in bytes [direct.pooled: [13589] , direct.unpooled: [14459] , heap.pooled: [18783] , heap.unpooled: [13803]. Netty 100 applications in round 3 [8192] Average Time in bytes [direct.pooled: [10185], direct.unpooled: [11644], heap.pooled: [9809], heap.unpooled: [12770]].netty 100 applications in round 4 [8192] Average Time in bytes [direct.pooled: [15980], direct.unpooled: [53980], heap.pooled: [5641], heap.unpooled: [12467]].netty 100 applications in round 5 [8192] Average time in bytes [direct.pooled: [4903] , direct.unpooled: [34215] , heap.pooled: [6659] , heap.unpooled: [12311].netty 100 applications in round 6 [8192] Average Time in bytes [direct.pooled: [2445], direct.unpooled: [7197], heap.pooled: [2849], heap.unpooled: [11010]].netty 100 applications in round 7 [8192] Average Time in bytes [direct.pooled: [2578], direct.unpooled: [4750], heap.pooled: [3904], heap.unpooled: [255689]].netty 100 applications in round 8 [8192] Average Time in bytes [direct.pooled: [1855] , direct.unpooled: [3492] , heap.pooled: [37822] , heap.unpooled: [3983].netty 100 applications in round 9 [8192] Average Time in bytes [direct.pooled: [1932], direct.unpooled: [2961], heap.pooled: [1825] , heap.unpooled: [6098]].

It can be seen that after ByteBuf Pooled, the efficiency of application is improved significantly. Therefore, frequent application of out-of-heap memory will reduce the performance of the server. At this time, the role of pooling becomes apparent. In pooling, you only need to apply for a large chunk of memory at the beginning, and then take objects out of the Pool and return them to the Pool. You do not need to apply separately each time, saving the time of applying for space outside the heap




ByteBuf concrete implementation

Here’s the most important one, which netty uses by default:PooledUnsafeDirectByteBuf, we also from its application PooledByteBufAllocator. DEFAULT. DirectBuffer to start ().

Create/retrieve /reuse — new() && allocate && get()/reuse()

The following from PooledByteBufAllocator. DEFAULT. DirectBuffer () to enter

  // To the first method to analyze
  protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
      // Get a thread-local cache pool from threadlLocal
      PoolThreadCache cache = (PoolThreadCache)this.threadCache.get();
      // This cache pool contains both heap and direct
      PoolArena<ByteBuffer> directArena = cache.directArena;
      Object buf;
      if(directArena ! =null) {
        buf = directArena.allocate(cache, initialCapacity, maxCapacity); // Down here -- 1
      } else {
        // If an out-of-heap buffer pool is not available, apply for an out-of-heap ByteBuf directly, with priority given to broadening
        buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
      }

      return toLeakAwareBuffer((ByteBuf)buf);
    }

  // 1 directArena.allocate(cache, initialCapacity, maxCapacity);
  PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
      // newByteBuf(maxCapacity); There are two implementations, directArena and heapArena
      // Pool reuse a ByteBuf in recycle
      PooledByteBuf<T> buf = newByteBuf(maxCapacity); // -- 2
      allocate(cache, buf, reqCapacity); // -- 7
      return buf;
    }
	
  // 2 newByteBuf(maxCapacity)
  protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
      // PooledUnsafeDirect is preferred
      if (HAS_UNSAFE) {
        // PooledUnsafeDirect
        return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); // -- 3
      } else {
        // PooledDirect
        returnPooledDirectByteBuf.newInstance(maxCapacity); }}// 3 PooledUnsafeDirectByteBuf.newInstance
  static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
      // Get a ByteBuf from ThreadLocal used for collection
      PooledUnsafeDirectByteBuf buf = RECYCLER.get();	// -- 4
      // reset the subscript of ByteBuf etc
      buf.reuse(maxCapacity);	// -- 6
      return buf;
    }

  // 4 Recycler.get()
  public final T get(a) {
      if (maxCapacityPerThread == 0) {
        return newObject((Handle<T>) NOOP_HANDLE);
      }
      // Each thread has a stack
      Stack<T> stack = threadLocal.get();
      // Pop up a handle
      DefaultHandle<T> handle = stack.pop();
      // If the stack does not have a Handle, create a new one
      if (handle == null) {
        handle = stack.newHandle();
        // newObject is implemented by the caller. Different bytebuFs create their own bytebuFs, which need to be implemented by the creator
        / / handle. The value is ByteBuf, from top to down, so here is a PooledUnsafeDirectByteBuf
        handle.value = newObject(handle); // -- 5
      }
      // Return a ByteBuf
      return (T) handle.value;
    }
		
  // 5 stack.pop () to fetch a handle from the Stack
  DefaultHandle<T> pop(a) {
      int size = this.size;
      if (size == 0) {
        if(! scavenge()) {return null;
        }
        size = this.size;
      }
      size --;
      // Fetch the handle at the top of the stack
      DefaultHandle ret = elements[size];
      elements[size] = null;
      if(ret.lastRecycledId ! = ret.recycleId) {throw new IllegalStateException("recycled multiple times");
      }
      // Reset the handle information
      ret.recycleId = 0;
      ret.lastRecycledId = 0;
      this.size = size;
      return ret;
    }

  Before reusing ByteBuf, you need to reset the previous subscript, etc
  final void reuse(int maxCapacity) {
      maxCapacity(maxCapacity);
      setRefCnt(1);
      setIndex0(0.0);
      discardMarks();
    }	
Copy the code

In steps 1 through 6 above, get the out-of-heap Arena from PoolThreadLocalCache and a thread-local ByteBuf stack from RECYCLE, depending on the size needed. Eject a ByteBuf from the stack and reset the read/write subscript of ByteBuf, etc.


Application – the memory/pool/the chunk/page/subPage

At this point, the second step of the code is traced, and the next step is the seventh step.

PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
      // newByteBuf(maxCapacity); There are two implementations, directArena and heapArena
      // Pool reuse a ByteBuf in recycle
      PooledByteBuf<T> buf = newByteBuf(maxCapacity); // -- 2
      allocate(cache, buf, reqCapacity); // -- 7
      return buf;
    }
Copy the code

Retrieve a ByteBuf from the RECYCLE thread local stack, reset read/write subscripts, etc. The rest is the point. Let’s keep following the code

	// allocate(cache, buf, reqCapacity); 7 -
	Huge (>16m) and Normal (>8192)
	private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) 	{
    		// Calculate the size that should be applied
        final int normCapacity = normalizeCapacity(reqCapacity); // -- 8

        // The size of the application is smaller than one page (default: 8192)
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            // reqCapacity < 512 
            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512 is tiny
                // Apply for tiny capacity space
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    return;
                }
                // Calculate which subpage tiny belongs to. Tiny is in 16B
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                //8192 > reqCapacity >= 512 is small
                // small The unit is 1024
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            // head points to its position in the table
            final PoolSubpage<T> head = table[tableIdx];

            /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                // Check whether subPage has been added
                // If yes, operate directly on the subPage, record the identifier, etc
                if(s ! = head) {assert s.doNotDestroy && s.elemSize == normCapacity;
                    // Subscript in the bitmap of subPage
                    long handle = s.allocate();
                    assert handle >= 0;
                    // Initialize the information in subPage with the initialized bytebuf
                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                    / / count
                    incTinySmallAllocation(tiny);
                    return; }}// The first time you create a ByteBuf of this size, you need to create a subPage
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
            }

            // Add count
            incTinySmallAllocation(tiny);
            return; }}Copy the code

Calculates the size of the ByteBuf that should be applied for

    The following code is in normalizeCapacity(reqCapacity)
    // If reqCapacity >= 512, use the same capacity expansion algorithm as hashMap
    // reqCapacity < 512(tiny type) changes reqCapacity to a multiple of 16
	if(! isTiny(reqCapacity)) {Find a number that is not less than the original number to the power of 2
    int normalizedCapacity = reqCapacity;
    normalizedCapacity --;
    normalizedCapacity |= normalizedCapacity >>>  1;
    normalizedCapacity |= normalizedCapacity >>>  2;
    normalizedCapacity |= normalizedCapacity >>>  4;
    normalizedCapacity |= normalizedCapacity >>>  8;
    normalizedCapacity |= normalizedCapacity >>> 16;
    normalizedCapacity ++;

    //
    if (normalizedCapacity < 0) {
      normalizedCapacity >>>= 1;
    }
    assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;

    return normalizedCapacity;
  }

	// reqCapacity < 512
	// It is already a multiple of 16
	if ((reqCapacity & 15) = =0) {
    	return reqCapacity;
  	}
	// Instead of a multiple of 16, convert to a multiple of 16
	return (reqCapacity & ~15) + 16; 
Copy the code


becauseSmall and tinyThere’s still a lot of similarities, so let’s choosetinyIn terms of

// Apply for tiny capacity space
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
  return;
}
// Calculate which subpage tiny belongs to. Tiny is in 16B
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;

// head points to its position in the table
final PoolSubpage<T> head = table[tableIdx];
Copy the code

TinySubpagePools = tinySubPage pools = tinySubPage pools = tinySubPage pools = tinySubPage pools = tinySubPage pools = tinySubPage pools = tinySubPage pools = tinySubPage pools = tinySubPage pools = tinySubPage pools

tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
// Initialize 32 types of subPage heads
for (int i = 0; i < tinySubpagePools.length; i ++) {
  tinySubpagePools[i] = newSubpagePoolHead(pageSize);
}
// 512/16 = 32
static final int numTinySubpagePools = 512 >>> 4;
Copy the code

NumTinySubpagePools numTinySubpagePools numTinySubpagePools 512 = small and tiny 512 >>> 4 = 32 TinySubpagePools ->[16,32,48….512]; tinySubpagePools->[16,32,48….512] The tinyIdx(int normCapacity) is used to calculate which type of ByteBuf belongs to and obtain the subscripts of this type of ByteBuf in tinySubpagePools. The constructor initializes all the heads. Instead of applying to the head, a new subPage will be created, and then a bidirectional list will be formed with the head. In this order, poolSubPage(init or allocate) is next.


SubPage Specifies the function of some fields

  final PoolChunk<T> chunk;
  // The index of the Page node where the current subPage resides
  private final int memoryMapIdx;
  // The offset of the current subpage's head in the chunk, in pageSize(default 8192)
  private final int runOffset;
  // default 8192
  private final int pageSize;
  // Default length of 8 long bytes, long is 64 bits, 8*64 = 512, 512 * 16(subPage minimum 16 bytes allocated) = 8192(one default page)
  // a page is divided into 512 16bytes, and each 16byte is marked with a bit. A long has 64bit, so a total of 512/64 = 8 long types are required for marking bits
  private final long[] bitmap;
  // The maximum number of elemSize ByteBuf bytes that can be stored in a Page
  // maxNumElems = pageSize / elemSize
  private int maxNumElems;
  // How many elemSize bytebufs can be contained
  private int numAvail;
  // This is the actual length of the bit that can be used, because it is not possible to find the value of elemSize in each page. bitmapLength = maxNumElems >>> 6;
  private int bitmapLength;
  Init () only initializes bitmapLength long types
	/** * for (int i = 0; i < bitmapLength; i ++) { * bitmap[i] = 0; *} * /          

Copy the code

For a page of 8192 size, calculate the maximum number of byte arrays of that size based on the size passed in. Calculate the maximum number of digits of type long that can be used as the marker bitmapLength according to the maximum number of digits that can be held. Finally initialize the bitmap, and see that the bitmap is already used in the marker page (in 16bytes).

Another important method in PoolSubPage is toHandle(); MemoryMapIdx and bitmapIdx are grouped together and recorded as long. The handle value can be used to obtain the corresponding node (according to memoryMapIdx) and the corresponding offset position (i.e. BitmapIdx * 16) under the node (page).

  private long toHandle(int bitmapIdx) {
        // The (int)handle is then used to change the handle value back to the memoryMapIdx, the index of the owning node
        return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
  }
Copy the code

Now that you’ve covered the subPage fields, follow up with the code above:

This code is processed after obtaining the corresponding subscript head node according to the size of the application. AllocateNormal (buf, reqCapacity, allocateNormal(buF, reqCapacity, allocateNormal); normCapacity); Allocate a new subPage

  synchronized (head) {
    final PoolSubpage<T> s = head.next;
    // Check whether subPage has been added
    // If yes, operate directly on the subPage, record the identifier, etc
    if(s ! = head) {assert s.doNotDestroy && s.elemSize == normCapacity;
      Subscript && node subscript in the bitmap of subPage
      long handle = s.allocate();
      assert handle >= 0;
      // Update information in subPage with initialized bytebuf
      s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
      / / count
      incTinySmallAllocation(tiny);
      return; }}Copy the code

The initBufWithSubpage method can be traced as follows:

buf.init(
        this, handle,
        runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
            reqCapacity, subpage.elemSize, arena.parent.threadCache());
Copy the code

runOffset(memoryMapIdx): MemoryMapIdx is the subscript of the node, and runOffset is the offset of the node in the chunk. The offset is in unit of 8192 (bitmapIdx & 0x3FFFFFFF) * subPage. elemSize: This offset represents the offset of the bitmapIdx subscript in the subPage. Offset: represents the offset of chunk itself

The sum of the three offsets is the specific offset of the index represented by bitmapIdx across the entire cache pool