“This is the 15th day of my participation in the First Challenge 2022. For details: First Challenge 2022”

preface

In order to meet its own high throughput requirements, Netty has further enhanced the encapsulation of ThreadLocal. This paper will analyze its specific encapsulation FastThreadLocal

Encapsulation of Runnable

This is mainly done by cleaning up ThreadLocal in finally to prevent memory leaks

final class FastThreadLocalRunnable implements Runnable {
    private final Runnable runnable;

    @Override
    public void run(a) {
        try {
            runnable.run();
        } finally {
          	// Clean up hereFastThreadLocal.removeAll(); }}}Copy the code

rightThreadThe encapsulation

Thread has been enhanced by adding a runnable wrapper tag and a custom ThreadLocal attribute.

If a Runnable is passed in the constructor, it is wrapped as FastThreadLocalRunnable and cleanupFastThreadLocals is set to true

public class FastThreadLocalThread extends Thread {
    // indicates whether runnable is encapsulated, and if so, we clean it up at the end of the run method
    private final boolean cleanupFastThreadLocals;

    // Custom ThreadLocal
    private InternalThreadLocalMap threadLocalMap;
}
Copy the code

FastThreadLocal

public class FastThreadLocal<V> {
    // The subscript used for the delete operation, pointing to 0
    // Atomic operation
    private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
  
    // Specify the current index, which is assigned in the constructor function
    private final int index;
    
    // Assign a specific subscript
    public FastThreadLocal(a) { index = InternalThreadLocalMap.nextVariableIndex(); }}Copy the code

Get the subscript operation inside InternalThreadLocalMap

// Atom definition, pointing to the next subscript to be allocated
static final AtomicInteger nextIndex = new AtomicInteger();

// Assign subscripts
public static int nextVariableIndex(a) {
    int index = nextIndex.getAndIncrement();
    if (index < 0) {
        nextIndex.decrementAndGet();
        throw new IllegalStateException("too many thread-local indexed variables");
    }
    return index;
}
Copy the code

Initialization operation

The first element of the array is a Set

> that stores all FastThreadLocal associated with the map

private V initialize(InternalThreadLocalMap threadLocalMap) {
    V v = null;
    try {
      // The default implementation is null
      v = initialValue();
    } catch (Exception e) {
      PlatformDependent.throwException(e);
    }
		
    // Set the value of the current index to the initial value
    threadLocalMap.setIndexedVariable(index, v);
    // Map corresponds to the current FastThreadLocal, and the current FastThreadLocal is recorded in the map
    addToVariablesToRemove(threadLocalMap, this);
    return v;
}
Copy the code

Get operation

public final V get(a) {
    // Get the current index
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    Object v = threadLocalMap.indexedVariable(index);
    if(v ! = InternalThreadLocalMap.UNSET) {return (V) v;
    }

    // If UNSET, initialize
    V value = initialize(threadLocalMap);
    // Record subsequent subscripts that need to be cleaned through BitSet
    registerCleaner(threadLocalMap);
    return value;
}
Copy the code

Set operations

public final void set(V value) {
    if(value ! = InternalThreadLocalMap.UNSET) { InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();// setKnownNotUnset adds the FastThreadLocal association
        if (setKnownNotUnset(threadLocalMap, value)) {
            // If a new ThreadLocal is associated, the cleanup is registeredregisterCleaner(threadLocalMap); }}else {
        // If set is UNSET, clearing is requiredremove(); }}Copy the code

The remove operation

public final void remove(InternalThreadLocalMap threadLocalMap) {
    if (threadLocalMap == null) {
      return;
    }
  
    // Set UNSET to return the overwritten value. Note that the overwritten value may also be UNSET
    Object v = threadLocalMap.removeIndexedVariable(index);
    // Perform cleanup operations
    removeFromVariablesToRemove(threadLocalMap, this);

    // Call the hook function
    if(v ! = InternalThreadLocalMap.UNSET) {try {
            onRemoval((V) v);
        } catch(Exception e) { PlatformDependent.throwException(e); }}}Copy the code

Cleaning up associations

When a FastThreadLocal thread is set to UNSET, its relationship with the FastThreadLocalMap needs to be cleaned up

private static void removeFromVariablesToRemove( InternalThreadLocalMap threadLocalMap, FastThreadLocal
        variable) {

    // Get the Set relation
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
    // If it is empty, no processing
    if (v == InternalThreadLocalMap.UNSET || v == null) {
        return;
    }

    @SuppressWarnings("unchecked")Set<FastThreadLocal<? >> variablesToRemove = (Set<FastThreadLocal<? >>) v;// Remove the current FastThreadLocal from the Set
    variablesToRemove.remove(variable);
}
Copy the code

RemoveAll operation

Called when Runnable exits

public static void removeAll(a) {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
    if (threadLocalMap == null) {
        return;
    }

    try {
      // 1. Obtain the association relation Set
      Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
      if(v ! =null&& v ! = InternalThreadLocalMap.UNSET) {@SuppressWarnings("unchecked")Set<FastThreadLocal<? >> variablesToRemove = (Set<FastThreadLocal<? >>) v; FastThreadLocal<? >[] variablesToRemoveArray = variablesToRemove.toArray(new FastThreadLocal[0]);
        
        // 2. Delete the FastThreadLocal and its index values one by one
        for(FastThreadLocal<? > tlv: variablesToRemoveArray) { tlv.remove(threadLocalMap); }}}finally {
        // 3. Delete the map of the current threadInternalThreadLocalMap.remove(); }}Copy the code

InternalThreadLocalMap

Inheritance in UnpaddedInternalThreadLocalMap

public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
    public static final Object UNSET = new Object();
}
Copy the code

UnpaddedInternalThreadLocalMap mainly for its important attribute definition

class UnpaddedInternalThreadLocalMap {
    static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
    static final AtomicInteger nextIndex = new AtomicInteger();

    /** Used by {@link FastThreadLocal} */
    Object[] indexedVariables;
  
    UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
        this.indexedVariables = indexedVariables; }}Copy the code

The first element of the array indexedVariables is the reserved element

It will also be deducted when calculating size

The basic information

Initial size 32 Minimum size 8

Capacity expansion mechanism: Double capacity expansion and ensure that it is a power of 2

An internal static variable holds the instance into ThreadLocal

static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
Copy the code

Gets the InternalThreadLocalMap instance method

Both methods determine whether the current Thread instance is enhanced as a FastThreadLocalThread, and if so, get the threadLocalMap instance directly from it, or from the static ThreadLocal attribute

In fact, most of the internal operations are going to make this judgment

public static InternalThreadLocalMap getIfSet(a);
public static InternalThreadLocalMap get(a);
Copy the code

Byte filling

There’s a paragraph in the code that looks like this

// Cache line padding (must be public)
// With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.
public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;
Copy the code

This section is actually used for byte padding to avoid cache invalidation

For details, see About FastThreadLocal Cache Line padding???? · Issue #9284 · Netty /netty (github.com)

The old version of the class had 128 bytes, just enough for the cache size, but the new version added two variables that made it 136 bytes, but forgot to reduce the padding

Initialization operation

The initialization code is as follows

// Used instead of null
public static final Object UNSET = new Object();

// constructor
private InternalThreadLocalMap(a) {
    super(newIndexedVariableTable());
}

// Initialize the size to 32 and fill UNSET
private static Object[] newIndexedVariableTable() {
    Object[] array = new Object[32];
    Arrays.fill(array, UNSET);
    return array;
}
Copy the code

The map’s own get set and remove methods

public static InternalThreadLocalMap getIfSet(a) {
    // Get directly, may return null
}

public static InternalThreadLocalMap get(a) {
    // If the value is null, create a new one
}

// Delete the map from thread or threadLocal
public static void remove(a) {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) {
        ((FastThreadLocalThread) thread).setThreadLocalMap(null);
    } else{ slowThreadLocalMap.remove(); }}Copy the code

Get operation

Get looks up the array directly, o(1)

public Object indexedVariable(int index) {
    Object[] lookup = indexedVariables;
    return index < lookup.length? lookup[index] : UNSET;
}
Copy the code

Set operations

Normal set function

// True is returned when a new threadLocal is created
public boolean setIndexedVariable(int index, Object value) {
    Object[] lookup = indexedVariables;
    if (index < lookup.length) {
        // If there is enough space, set it directly
        Object oldValue = lookup[index];
        lookup[index] = value;
        return oldValue == UNSET;
    } else {
        // If there is not enough space, set it again
        expandIndexedVariableTableAndSet(index, value);
        return true; }}Copy the code

Function to expand and set

private void expandIndexedVariableTableAndSet(int index, Object value) {
  Object[] oldArray = indexedVariables;
  final int oldCapacity = oldArray.length;
  
  // Make sure it is 2^n after expansion
  int newCapacity = index;
  newCapacity |= newCapacity >>>  1;
  newCapacity |= newCapacity >>>  2;
  newCapacity |= newCapacity >>>  4;
  newCapacity |= newCapacity >>>  8;
  newCapacity |= newCapacity >>> 16;
  newCapacity ++;

  // Copy the old array
  Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
  // The fill space is UNSET
  Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
  / / set the new value
  newArray[index] = value;
  indexedVariables = newArray;
}
Copy the code

The remove operation

If the subscript is within the range, the removed value is returned when set to UNSET; otherwise, UNSET is returned

public Object removeIndexedVariable(int index) {
    Object[] lookup = indexedVariables;
    if (index < lookup.length) {
      Object v = lookup[index];
      lookup[index] = UNSET;
      return v;
    } else {
      returnUNSET; }}Copy the code

The end of the