Welcome to the public number [sharedCode] committed to mainstream middleware source code analysis, personal website: www.shared-code.com/

preface

At the end of our previous article on Multithreading — Parent-child Thread context delivery, we learned that the use of InheritableThreadLocal provided by the JDK in thread pools is not ideal because the value we get is probably not what we want in the case of multiplexing threads. Next I would like to introduce you to an open source component, Ali open source, with a good feeling.

TransmittableThreadLocal

In general, ThreadLocal works fine, and when we need to pass ThreadLocal in the case of an execution component that uses pooled multiplexing threads such as thread pools,

This scenario is the problem with TransmittableThreadLocal.

Github address: github.com/alibaba/tra…

If you are interested, you can download it and play with it. Let’s take a look at the magic of this component.

So let’s do a demo, and we’ll see how it works, right

demo

/** * TTL test **@author zhangyunhe
 * @dateThe 2020-04-23 "* /
public class Test {

    // 1. Initialize a TransmittableThreadLocal that inherits the InheritableThreadLocal
    static TransmittableThreadLocal<String> local = new TransmittableThreadLocal<>();

    // Initialize a thread pool of length 1
    static ExecutorService poolExecutor = Executors.newFixedThreadPool(1);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Test test = new Test();
        test.test();
    }
    private void test(a) throws ExecutionException, InterruptedException {

        // Set the initial value
        local.set("The King of Heaven");
        //!!!! Note: This is where the Task is wrapped with TtlRunnable
        Future future = poolExecutor.submit(TtlRunnable.get(new Task("Task 1")));
        future.get();

        Future future2 = poolExecutor.submit(TtlRunnable.get(new Task(Task 2 "")));
        future2.get();
      
        System.out.println("Parent thread value:"+local.get());
        poolExecutor.shutdown();
    }

    class Task implements Runnable{

        String str;
        Task(String str){
            this.str = str;
        }
        @Override
        public void run(a) {
            / / get the value
            System.out.println(Thread.currentThread().getName()+":"+local.get());
            // Reset a wavelocal.set(str); }}}Copy the code

Output result:

pool-1-thread-1I'm pool-1-thread-1The value of the parent thread is: The parent threadCopy the code

The principle of analysis

If there is any source that has TransmittableThreadLocal,

public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> {
  
  // 1. Here the holder is his main design point, and then the TtlRunnable is constructed
  private staticInheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ? >> holder =newInheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ? > > () {@Override
                protectedWeakHashMap<TransmittableThreadLocal<Object>, ? > initialValue() {return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
                }

                @Override
                protectedWeakHashMap<TransmittableThreadLocal<Object>, ? > childValue(WeakHashMap<TransmittableThreadLocal<Object>, ? > parentValue) {return newWeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue); }};@SuppressWarnings("unchecked")
    private void addThisToHolder(a) {
        if(! holder.get().containsKey(this)) {
            holder.get().put((TransmittableThreadLocal<Object>) this.null); // WeakHashMap supports null value.}}@Override
    public final T get(a) {
        T value = super.get();
        if (disableIgnoreNullValueSemantics || null! = value) addThisToHolder();return value;
    }
    /**
     * see {@link InheritableThreadLocal#set}
     */
    @Override
    public final void set(T value) {
        if(! disableIgnoreNullValueSemantics &&null == value) {
            // may set null to remove value
            remove();
        } else {
            super.set(value); addThisToHolder(); }}/**
     * see {@link InheritableThreadLocal#remove()}
     */
    @Override
    public final void remove(a) {
        removeThisFromHolder();
        super.remove();
    }

    private void superRemove(a) {
        super.remove(); }}Copy the code

Step description:

  1. In the code, the author builds a Holder object, which is aInheritableThreadLocal, the type inside is a weak-reference WeakHashMap, and the VA LU of this map isTransmittableThreadLocalAnd value is always empty

The holder stores all references to the TransmittableThreadLocal context within the application.

  1. As you can see from the above, every get, set, and remove operation on the holder object is done to preserveTransmittableThreadLocalA copy of all such references is stored in holder.

TtlRunnable

Let’s go back to our code above

Future future = poolExecutor.submit(TtlRunnable.get(new Task("Task 1")));
Copy the code

As you may have noticed, we called the TtlRunnable get method. Let’s see what this method does

public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
        if (null == runnable) return null;

        if (runnable instanceof TtlEnhanced) {
            // avoid redundant decoration, and ensure idempotency
            if (idempotent) return (TtlRunnable) runnable;
            else throw new IllegalStateException("Already TtlRunnable!");
        }
  			// That's the point
        return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
    }
Copy the code

Look at the code above, we don’t look at the details, we look at the general idea, this place basically is to build a TtlRunnable object based on the runnable passed in.

private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
  			// That's the point
        this.capturedRef = new AtomicReference<Object>(capture());
        this.runnable = runnable;
        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
Copy the code

The following line of code, while still inside the main thread, calls capture

this.capturedRef = new AtomicReference<Object>(capture());
Copy the code
capture
public static Object capture(a) {
    // Build a temporary object, mainly the captureTtlValues method
    return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}

private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
  // Construct a WeakHashMap method,
  WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
  // Inside the main thread, call the holder variable, loop through all the keys and values inside
  for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
    ttl2Value.put(threadLocal, threadLocal.copyValue());
  }
  // go back out
  return ttl2Value;
}
Copy the code

Step description:

1. Call the static variable holder, loop all the inside of the key and value, value is a bit more clever.

private T copyValue(a) {
      / / the get method, call the superclass method, can eventually get to the current inside the parent class TransmittableThreadLocal corresponds to the value
      return copy(get());
}
Copy the code

2. Assemble a WeakHashMap out, and eventually it will go to the above constructor, which is the assignment operation for capturedRef.

run
@Override
public void run(a) {
  //1. Get the capturedRef object that was initialized when the TtlRunnable object was constructed. Contains data from the parent thread when the task was dropped in from Submit
  Object captured = capturedRef.get();
  if (captured == null|| releaseTtlValueReferenceAfterRun && ! capturedRef.compareAndSet(captured,null)) {
    throw new IllegalStateException("TTL value reference is released after run!");
  }
	// Clears keys that are not captured, and resets all ThreadLocal values in the child thread
  Object backup = replay(captured);
  try {
    // Execute the actual thread method
    runnable.run();
  } finally {
    // Do the restore work according to backuprestore(backup); }}private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) {
            WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();

    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
      TransmittableThreadLocal<Object> threadLocal = iterator.next();

      // Make a local backup of the current thread
      backup.put(threadLocal, threadLocal.get());

      // Remove data that is not captured.
      if (!captured.containsKey(threadLocal)) {
        iterator.remove();
        threadLocal.superRemove();
      }
    }

    // Set the value to the TransmittableThreadLocal of the current thread.
    setTtlValuesTo(captured);

    // A hook
    doExecuteCallback(true);

    return backup;
}
Copy the code

Conclusion:

Some friends looked at the above so much source code, may have a bit of meng, I may say a bit of chaos, here to sum up.

1. Create a new class with TransmittableThreadLocal that InheritableThreadLocal. This class has a hodel variable that is used to maintain all the TransmittableThreadLocal references.

2. When the actual Submit task is sent to the thread pool, we need to call TtlRunnable. Get method to build a wrapper class of the task. If there is any thread TransmittableThreadLocal in the main thread, all values within the thread TransmittableThreadLocal are initialized. If there is any thread TransmittableThreadLocal in the main thread, all values within the thread TransmittableThreadLocal are initialized. This value is actually the parent of the current thread (regardless of the parent thread that created the thread, note that this is a thread pool scenario).

Captured (all of which have TransmittableThreadLocal and transmittableLocal values), remove unnecessary keys and values from the current thread, Update to ThreadLocal of the current thread. This provides the safety of parent-child threads passing values in pooling techniques