Netty Source code storage Management (3)(4.1.44)
An overview of the
This is our third article on Netty memory management. The previous two articles focused on ByteBuf objects and memory allocation and management. For example, use divide-and-conquer to create multiple poolarenas to provide memory allocation capabilities, and use full binary tree data structures to manage memory allocation and reclamation. But there is one important thing we did not talk about, is that Jemalloc borrowed from TCmalloc local thread cache idea, this article is combined with the source code to explain how Netty is using local thread cache to further improve the efficiency of Netty memory allocation.
Memory allocation entry
Review andpoolingEntry object for memory allocationPooledByteBufAllocator
Inside, there is a variable that is relevant to today’s topicPoolThreadLocalCache threadCache
, which inherits FastThreadLocal and belongs to the local thread cache variable. Each thread can (not always, since the default configuration can be modified to not use caches) fetch the PoolThreadCache object from the local thread cache variable, which is where each thread actually caches memory. The specific diagram is simply described as follows:PooledByteBufAllocator holds PoolThreadLocalCache (which has the same function as ThreadLocal)Netty source code FastThreadLocalThread) object, so each thread has its own PoolThreadCache (thread private). PoolThreadCache allocates memory from PoolThreadCache. If PoolThreadCache does not have a matching memory block, PoolArena allocates memory from PoolThreadCache. If PoolThreadCache does not have a matching memory block, PoolArena allocates memory. This adds a buffer between the thread and the PoolArena object, further increasing the efficiency of memory allocation. When a thread runs out of a chunk of memory, it does not return it directly to PoolChunk, but instead uses PoolThreadCache to store it. Does PoolThreadCache always cache PoolChunk chunks allocated to threads? This you don’t have to worry about, PoolThreadCache (allocations) in the distribution of more than threshold (freeSweepAllocationThreshold, default values: (8192) then the PoolArena will trigger the release action to return the spare memory to the PoolArena. Before reading the source code, let’s take a look at the classes used.
Recognize related classes
PoolThreadLocalCache
This class inherits from FastThreadLocal,See here to learn more about FastThreadLocal. There is an important internal initialization method that says one thing: each thread is bound to only one of the poolArenas (specifically, heapArena and directArena) and only deals with this PoolArena for the entire thread life. This is also the embodiment of the algorithmic idea of Jemalloc (divide-and-conquer) (which can improve the performance of multi-threaded memory allocation). Finding a particular PoolArena is tricky: by comparing the number of threads bound to each PoolArena, select the smallest PoolArena to bind to the current memory requisition thread. The specific source code is as follows:
// io.netty.buffer.PooledByteBufAllocator#freeThreadLocalCache
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
private final boolean useCacheForAllThreads;
PoolThreadLocalCache(boolean useCacheForAllThreads) {
this.useCacheForAllThreads = useCacheForAllThreads;
}
// This is called every time "FastThreadLocal" is initialized
// Can be used to do some initialization work, in this case to initialize heapArena and directArena objects
@Override
protected synchronized PoolThreadCache initialValue(a) {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
final Thread current = Thread.currentThread();
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
final PoolThreadCache cache = new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) {
final EventExecutor executor = ThreadExecutorMap.currentExecutor();
if(executor ! =null) { executor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); }}return cache;
}
// No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0.0.0.0.0);
}
@Override
protected void onRemoval(PoolThreadCache threadCache) {
threadCache.free(false);
}
// Compare the numThreadCaches of each PoolArena and select the smallest one to bind to
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
PoolArena<T> minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
if(arena.numThreadCaches.get() < minArena.numThreadCaches.get()) { minArena = arena; }}returnminArena; }}Copy the code
PoolThreadLocalCache’s generic type is PoolThreadCache, so you can get the PoolThreadCache object (PoolThreadLocalCache#get()), which is where the memory information is stored.
PoolThreadCache
This object is the highlight of the day, and it is the core class for cache memory. We have looked at the relevant variables previously, so here is a review:Explain the relevant variables: heapArena and directArena are inPoolThreadLocalCache#initialValue()
Initialized, as we saw in the previous section. Residual variables are classified this way. One is based onData container typeClassification: There are two types of data containers, respectivelybyte[]
和 ByteBuffer
. The second is according to the memory specification classification, cachetiny&small&normalLevel memory, ignoring Huge. Another point to note is default size, which I’ve highlighted in the diagram, which represents the default size of the array length. What are these sizes based on? Review the following memory size diagram (chunkSize defaults to 16MB) :As you can see from the figure above, the size of the array is the same as the length specified above (Tiny has extra zeros, so it adds up to 32). This is just the default allocation size (ChunkSize is 16MB by default, and the length will vary depending on the size of ChunkSize, but will not be changed). Note that for Normal arrays the length is only 3, because PoolThreadLocalCache only caches memory blocks of 8K, 16KB, and 32KB, and memory larger than 32KB is returned directly to PoolArena. The following figure shows the relationship between the specification values and the array indexes.The relevant variables are resolved as follows
// io.netty.buffer.PoolThreadCache
/** * Local cache Cache memory block information object */
final class PoolThreadCache {
// Each thread is bound to a "heapArena" and "directArena" object
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
// Categorize by data container
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
// Sort according to memory rules
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
// Used to calculate the offset value
private final int numShiftsNormalDirect;
private final int numShiftsNormalHeap;
// Triggers the release of some memory blocks
private final int freeSweepAllocationThreshold;
// Whether PoolThreadCache needs to be released
private final AtomicBoolean freed = new AtomicBoolean();
// Number of times allocated from the local thread cache
/ / when more than freeSweepAllocationThreshold reset to zero
private int allocations;
// ...
}
Copy the code
PoolThreadCache defines the rules for caching memory blocks. Similar to PoolArena, PoolThreadCache uses an array to cache memory information. The array number corresponds to the size of the memory block. There is a MemoryRegionCache object, which, of course, is a wrapper class that records information about a chunk of memory. The following details.
MemoryRegionCache
MemoryRegionCache is the core class for recording cached memory information. The related core attributes are explained as follows
// io.netty.buffer.PoolThreadCache.MemoryRegionCache
private abstract static class MemoryRegionCache<T> {
// Number of caches
private final int size;
// Store the memory information (wrapped with an Entry object) queue.
// This queue is a multi-production single consumer queue.
private final Queue<Entry<T>> queue;
// MemoryRegionCache Memory specifications managed by MemoryRegionCache
private final SizeClass sizeClass;
// "MemoryRegionCache" successfully allocates times, which is different from "PoolThreadCache"
private int allocations;
MemoryRegionCache(int size, SizeClass sizeClass) {
/ / initialization
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass; }}Copy the code
The internal Size matches the memory Size and represents the Size of the cache queue. For example, the size value of the Tiny level is 512, and the size of the Small level is 256 (see PooledByteBufAllocator cache size, which is configurable). Queue
> is a multi-producer, single-consumer Queue from JCTools. We use Entry objects to encapsulate memory information (PoolChunk, nioBuffer, handle, etc.) and place it in the Queue. It can be directly ejected from the queue for subsequent memory application. Allocations records the number of memory blocks that were successfully applied from MemoryRegionCache. This allocation is related to MemoryRegionCache freeing some memory blocks.
Local threads reclaim memory blocks
Calling ByteBuf#release() makes the reference count -1, and a reference count of 0 means that the ByteBuf object needs to be reclaimed, and the ByteBuf object is pooled into the memory block managed by the ByteBuf object. However, the PoolThreadCache memory block is truncated before entering the memory pool, and is put into the local thread cache for future use by the local thread. Specific source code analysis is as follows:
PoolArena#free
We can do this via DEBUGPoolArena#free
, other calling methods are omitted. In the callfreeChunk()
Method is used to reclaim the cache.
// io.netty.buffer.PoolArena#free
void free(PoolChunk<T> chunk,
ByteBuffer nioBuffer,
long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
SizeClass sizeClass = sizeClass(normCapacity);
// Let the local thread cache try to reclaim first
if(cache ! =null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
freeChunk(chunk, handle, sizeClass, nioBuffer, false); }}Copy the code
PoolThreadCache#add
PoolThreadCache is a local cache cache variable that is thread private. methodsadd()
This method does not do much other than identify the MemoryRegionCache object based on the specification value and specification type. If the matching fails, the MemoryRegionCache object may exceed the capacity and cannot be reclaimed. This type of memory block can only be reclaimed by PoolChunk.
// io.netty.buffer.PoolThreadCache#add
/** * Local cache retrieves memory blocks *@paramArea PoolArena * to which the current memory block belongs@paramChunk PoolChunk * to which the current memory block belongs@paramNioBuffer Current memory block wrapped "ByteBuffer" object *@paramHandle Current memory block handle value *@paramNormCapacity Capacity x@paramSizeClass Capacity specification type *@returnTrue: The local thread reclaims successfully */
boolean add(PoolArena<? > area, PoolChunk chunk, ByteBuffer nioBuffer,long handle,
int normCapacity,
SizeClass sizeClass) {
// #1 Get the "MemoryRegionCache" object based on the specification type and valueMemoryRegionCache<? > cache = cache(area, normCapacity, sizeClass);// #2 No "MemoryRegionCache
if (cache == null) {
return false;
}
// reclaim the cache
return cache.add(chunk, nioBuffer, handle);
}
// Obtain the MemoryRegionCache object based on the specification type and value
// io.netty.buffer.PoolThreadCache#cache
privateMemoryRegionCache<? > cache(PoolArena<? > area,int normCapacity, SizeClass sizeClass) {
switch (sizeClass) {
case Normal:
return cacheForNormal(area, normCapacity);
case Small:
return cacheForSmall(area, normCapacity);
case Tiny:
return cacheForTiny(area, normCapacity);
default:
throw newError(); }}Copy the code
MemoryRegionCache#add
Ultimately, MemoryRegionCache delegates memory block information to the internal Queue. The adding process is also very simple, using the internal class Entry to encapsulate the memory block information, and then enqueue. whenQueue#offer()
When a method fails to be added, you need to reclaim the Entry object immediately, which may cause a memory leak. Entry objects use object pooling techniques.
// io.netty.buffer.PoolThreadCache.MemoryRegionCache#add
/** * wrap memory block information and write to queue *@paramChunk The memory block belongs to PoolChunk *@paramNioBuffer may be empty, if not, ByteBuffer objects are cached for next use, reducing GC *@paramHandle Memory handle *@returnFalse The queue is full, adding failed */
@SuppressWarnings("unchecked")
public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle) {
// #1 Create an "Entry" object that wraps the details of the memory block
Entry<T> entry = newEntry(chunk, nioBuffer, handle);
/ / # 2 team
boolean queued = queue.offer(entry);
if(! queued) {// If the Entry fails to join the queue, reclaim the Entry object immediately. Otherwise, memory leaks will occur
entry.recycle();
}
// #3 returns cached results
return queued;
}
// Construct an Entry object
private static Entry newEntry(PoolChunk<? > chunk, ByteBuffer nioBuffer,long handle) {
// #1 retrieves the Entry object from the object cache pool
Entry entry = RECYCLER.get();
// #2 encapsulates memory information
entry.chunk = chunk;
entry.nioBuffer = nioBuffer;
entry.handle = handle;
/ / # 3 returns
return entry;
}
// Create an object cache pool
@SuppressWarnings("rawtypes")
private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
@SuppressWarnings("unchecked")
@Override
public Entry newObject(Handle<Entry> handle) {
return newEntry(handle); }});Copy the code
At this point, the entire logic for the local thread to reclaim a chunk of memory has been explained. Essentially, memory block information is encapsulated with an Entry object and then written to the corresponding MemoryRegionCache[] array. A MemoryRegionCache object maintains a queue inside which Entry objects are stored.
Attempt to allocate memory from cache
When requesting memory through the allocator, the Tiny&Small two levels of memory specifications are first attempted from the local thread cache. The relevant source code is as follows:
// io.netty.buffer.PoolArena#allocate
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { / / < 512
// Try using the local cache application
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
// Try using the local cache application
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
// ...
}
/ /...
}
Copy the code
PoolThreadCache#allocateTiny
PoolThreadCache#allocateTiny()
Method tries to apply for Tiny size memory.PoolThreadCache#allocateSmall()
In fact, the same thing, I won’t repeat the explanation here.
// Try allocating Tiny level memory and initialize the PooledByteBuf object
// io.netty.buffer.PoolThreadCache#allocateTiny
boolean allocateTiny(PoolArena<? > area, PooledByteBuf<? > buf,int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
/** * Get the corresponding MemoryRegionCache object */ based on the data container and specification value
// io.netty.buffer.PoolThreadCache#cacheForTiny
privateMemoryRegionCache<? > cacheForTiny(PoolArena<? > area,int normCapacity) {
// #1 determine the array index value based on the specification value
int idx = PoolArena.tinyIdx(normCapacity);
// #2 Retrieve MemoryRegionCache objects from different arrays based on the data container
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
// Returns a MemoryRegionCache object with the corresponding cache subscript
// io.netty.buffer.PoolThreadCache#cache
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
if (cache == null || idx > cache.length - 1) {
return null;
}
return cache[idx];
}
Copy the code
PoolThreadCache#allocate
The simple method is to delegate a MemoryRegionCache object to allocate memory. Then check whether the total number of MemoryRegionCache arrays allocated exceeds the threshold. If so, all MemoryRegionCache arrays need to be cleaned.
// io.netty.buffer.PoolThreadCache#allocate
If MemoryRegionCache is null, the local thread cache failed to match. Return false */
private boolean allocate(MemoryRegionCache<? > cache, PooledByteBuf buf,int reqCapacity) {
if (cache == null) {
// no cache found so just return false here
return false;
}
// Delegate MemoryRegionCache to allocate memory
boolean allocated = cache.allocate(buf, reqCapacity);
// If the number of memory allocations exceeds the threshold, some cached memory information needs to be cleared
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
/ / clean up
trim();
}
/ / return
return allocated;
}
Copy the code
MemoryRegionCache#allocate
If the eject object is empty, the allocation fails. Return false. If there is, init() initializes the ByteBuf object. And reclaim the Entry object.
// io.netty.buffer.PoolThreadCache.MemoryRegionCache#allocate
/** * Allocate the appropriate memory and initialize the ByteBuf object *@paramBuf ByteBuf object, the original object, has no internal memory block information *@paramReqCapacity Indicates the real memory applied by the user *@return* /
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
// #1 get the "Entry" object from the queue
Entry<T> entry = queue.poll();
// #2 = 0
if (entry == null) {
return false;
}
// Write memory information to the ByteBuf object
initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
// #4 Recycle the used Entry objects into the object pool
entry.recycle();
// #5 allocate times +1, when the number of allocate times exceeds the threshold need to clean up
++ allocations;
// select * from *
return true;
}
Copy the code
The method in Step 3 is an abstract method. MemoryRegionCache is an abstract class with two subclasses:The two subclasses are just implementationsinitBuf
This abstract method calls different initialization methods in the subclass implementation. The primary difference in initializing ByteBuf logic is computationoffset
和 maxLength
The logic is different. Everything else is the same. The relevant source code is as follows:
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
super(size, sizeClass);
}
@Override
protected void initBuf(PoolChunk<T> chunk,
ByteBuffer nioBuffer,
long handle,
PooledByteBuf<T> buf,
int reqCapacity) { chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity); }}private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
NormalMemoryRegionCache(int size) {
super(size, SizeClass.Normal);
}
@Override
protected void initBuf(PoolChunk<T> chunk,
ByteBuffer nioBuffer,
long handle,
PooledByteBuf<T> buf,
int reqCapacity) { chunk.initBuf(buf, nioBuffer, handle, reqCapacity); }}// io.netty.buffer.PoolChunk#initBuf
buf.init(this, nioBuffer, handle, runOffset(memoryMapIdx) + offset,
reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache());
// io.netty.buffer.PoolChunk#initBufWithSubpage()
private void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer,
long handle, int bitmapIdx, int reqCapacity) {
assertbitmapIdx ! =0;
int memoryMapIdx = memoryMapIdx(handle);
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage.doNotDestroy;
assert reqCapacity <= subpage.elemSize;
buf.init(
this, nioBuffer, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
reqCapacity, subpage.elemSize, arena.parent.threadCache());
}
Copy the code
Free a block of memory in the cache
We should return part of the memory block at the appropriate time. It can not be monopolized by one thread. After all, there are other threads that need the memory block. The trigger to free a block of memory in the cache, as mentioned earlier, is when the number of allocations is greater than the free threshold (default: 8192) **.
// Release part of memory for all "MemoryRegionCache" arrays
// io.netty.buffer.PoolThreadCache#trim()
void trim(a) {
trim(tinySubPageDirectCaches);
trim(smallSubPageDirectCaches);
trim(normalDirectCaches);
trim(tinySubPageHeapCaches);
trim(smallSubPageHeapCaches);
trim(normalHeapCaches);
}
private static void trim(MemoryRegionCache
[] caches) {
if (caches == null) {
return;
}
// Go through the groups and clear them one by one
for (MemoryRegionCache<?> c: caches) {
trim(c);
}
}
private static void trim(MemoryRegionCache
cache) {
if (cache == null) {
return;
}
// Call MemoryRegionCache to clean up
cache.trim();
}
Copy the code
MemoryRegionCache#trim
The maximum quantity to be released is simply calculated.
/* Trim () this method is called when: * (1) trimTask timing task, the default is not open, you can set io.net ty. Allocation. CacheTrimIntervalMillis open, Unit of time: TimeUnit. Number of MILLISECONDS * (2) in the distribution of > = freeSweepAllocationThreshold (default: 8192) will trigger recycling * /
// io.netty.buffer.PoolThreadCache.MemoryRegionCache#trim
public final void trim(a) {
// #1 = 1; // #1 = 1
int free = size - allocations;
allocations = 0;
// If free>0, data exists in the queue and the number of free memory needs to be reclaimed
// Instead of releasing all memory information in the queue, it has a maximum free limit
if (free > 0) {
free(free, false); }}Copy the code
MemoryRegionCache#free(int, boolean)
A maximum of Max objects may be reclaimed.
// io.netty.buffer.PoolThreadCache.MemoryRegionCache#free
// Free memory
private int free(int max, boolean finalizer) {
int numFreed = 0;
// Loop Queue memory
for (; numFreed < max; numFreed++) {
/ / the pop-up
Entry<T> entry = queue.poll();
if(entry ! =null) {
/ / recycling
freeEntry(entry, finalizer);
} else {
// all cleared
returnnumFreed; }}return numFreed;
}
Copy the code
MemoryRegionCache#freeEntry
This method determines whether the Entry object needs to be reclaimed based on whether it is called from Object#finalizer(). If true, the thread is being destroyed. Calling PoolThreadCache# Finalize () will reclaim all thread-specific data, such as Entry, ObjectPool, etc. These objects will be destroyed automatically when the thread is destroyed. If you do not call the entry.crecycle() method, PoolChunk will always be strongly referenced and cannot be recycled, resulting in a memory leak. Please point out any mistakes in my personal opinions.
// io.netty.buffer.PoolThreadCache.MemoryRegionCache#freeEntry
/** * release memory and return it to PoolChunk *@paramEntry encapsulates the memory information object *@paramWhether finalizer is called by Object# Finalize () method */
private void freeEntry(Entry entry, boolean finalizer) {
// #1 Memory information
PoolChunk chunk = entry.chunk;
long handle = entry.handle;
ByteBuffer nioBuffer = entry.nioBuffer;
if(! finalizer) {// Reclaim the Entry object so that subsequent PoolChunk objects can be GC.
// This will not happen in the Oejct# Finalize () method
entry.recycle();
}
// return memory
chunk.arena.freeChunk(chunk, handle, sizeClass, nioBuffer, finalizer);
}
Copy the code
conclusion
Netty allocates the PoolThreadCache for each thread. If the underlying container type and MemoryRegionCache match successfully, the PoolThreadCache is allocated from the local cache. Get a chunk of memory directly from the queue and initialize the ByteBuf object, otherwise the PoolArena is allocated. When ByteBuf frees memory, it does not return the memory block information to PoolChunk. Instead, PoolThreadCache is used to locally cache the memory block information, and an Entry object is used to wrap the memory block information and put it in a queue for the next allocation. PoolThreadCache will not blindly cached thread to release the memory block, when allocated more than freeSweepAllocationThreshold (default: 8192) after the threshold will trigger the local cache recovery action, according to the distribution of recycling part number (or all) cache.