This is the 20th day of my participation in the First Challenge 2022

The background,

We can use InheritableThreadlocoal to pass local variables, but if we use thread pools, we may not be able to pass local variables, because the existence of a thread in a thread pool is not created every time it is used. InheritableThreadlocal is copied only when intertableThreadLocals=true when the thread is initialized. Therefore, if the child thread used this time is a thread that has been pooled, the offline use from the thread pool has not gone through the initialization process, and the local variable copy of the parent thread will not be carried out.

In most daily application scenarios, thread pools are used to manage resources effectively. Ali is currently known to have an open source project to solve this problem ThansmittableThreadLocal.

Second, the introduction of

In the case of execution components such as thread pools that pool and reuse threads, the function of passing ThreadLocal values is provided to solve the problem of context passing during asynchronous execution.

The InheritableThreadLocal class in the JDK completes value passing from parent thread to child thread. But in the case of execution components that are pooled and reused by threads, such as thread pools, threads are created by thread pools and are pooled and reused; In this case, parent-child ThreadLocal value passing is meaningless, and the application actually needs to pass the ThreadLocal value when the task is submitted to the thread pool to the task execution time.

There are many other application scenarios with TransmittableThreadLocal.

Three, basic use

1. Introduce the TransimittableThreadLocal dependence

<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>transmittable-thread-local</artifactId>
  <version>2.11.5</version>
</dependency>
Copy the code

2. Easy to use

public static void main(String[] args) throws InterruptedException {
  // Create a thread pool
  ExecutorService executorService = Executors.newSingleThreadExecutor();
  / / thread pool after TtlExecutors tools packaging, return ExecutorServiceTtlWrapper packaging class
  executorService = TtlExecutors.getTtlExecutorService(executorService);
	// Create local variables that need to be passed to the thread pool
  TransmittableThreadLocal<String> username = new TransmittableThreadLocal<>();
	
  // This is the first time that a thread is available in the pool. If you don't use TTL, use InheritableThreadLocal
  // The parent thread's local variable.
  username.set("zhangShang");
  executorService.submit(new Runnable() {
    @Override
    public void run(a) { System.out.println(username.get()); }});// The second call, since it is using a single thread pool, is to reuse the thread created above, so it passes
  // inheritableThreadLocal is not available with local variables.
  Thread.sleep(3000);
  username.set("liSi");
  executorService.submit(new Runnable() {
    @Override
    public void run(a) { System.out.println(username.get()); }}); Thread.sleep(3000);
  username.set("wangWu");
  executorService.submit(new Runnable() {
    @Override
    public void run(a) { System.out.println(username.get()); }}); }Copy the code

The output is:

zhangShang
liSi
wangWu
Copy the code

Fourth, the principle of

TransimittableThreadLocal inheritance, by definition, in InheritableThreadLocal, and implement TtlCopier interface, it is only a copy method. So it’s basically an extension to InheritableThreadLocal.

public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> 
Copy the code

Holder is added in the TransimittableThreadLocal properties. The effect of this property is that objects marked as thread passable are added to the object. An easy way to mark a class is to add a Type field to the class, and another way is to add objects of that Type to a static global collection. When used later, all values in the collection will have this flag.

// 1. The holder itself is an InheritableThreadLocal object
// 2. The value of this holder Object is WeakHashMap
      
       ,? >
      
// 2.1 WeekHashMap value is always null and cannot be used.
// 2.2 WeekHasshMap value=null
private staticInheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ? >> holder =newInheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ? > > () {@Override
  protectedWeakHashMap<TransmittableThreadLocal<Object>, ? > initialValue() {return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
  }
	
  /** * overrides the childValue method to implement properties of the parent thread directly as local variable objects of the child thread. * /
  @Override
  protectedWeakHashMap<TransmittableThreadLocal<Object>, ? > childValue(WeakHashMap<TransmittableThreadLocal<Object>, ? > parentValue) {return newWeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue); }};Copy the code

The application code uses the TTlexecTrue tool to wrap the thread pool object. Utility class is simply judgment, whether the input thread pool has already been packaging, not empty check, etc., and then return a wrapper class ExecutorServiceTtlWrapper. There are different and wrapper classes for different thread pool types.

@Nullable
public static ExecutorService getTtlExecutorService(@Nullable ExecutorService executorService) {
  if (TtlAgent.isTtlAgentLoaded() || executorService == null || executorService instanceof TtlEnhanced) {
    return executorService;
  }
  return new ExecutorServiceTtlWrapper(executorService);
}
Copy the code

Enter the wrapper class ExecutorServiceTtlWrapper. You’ll notice whether through ExecutorServiceTtlWrapper# or submit method is ExecutorTtlWrapper# the execute method, will be packaged into TtlCallable or TtlRunnable thread object, Used to do some business logic before actually executing the run method.

/ * * * in the method to realize the submit ExecutorServiceTtlWrapper * /
@NonNull
@Override
public <T> Future<T> submit(@NonNull Callable<T> task) {
  return executorService.submit(TtlCallable.get(task));
}

ExecutorTtlWrapper implement execute method */
@Override
public void execute(@NonNull Runnable command) {
  executor.execute(TtlRunnable.get(command));
}

Copy the code

So, the core logic of focus should be in TtlCallable#call() or TtlRunnable#run(). The following uses TtlCallable as an example. TtlRunnable is similarly used. Before analyzing the Call () method, look at a class Transmitter

public static class Transmitter {
  / * * * to capture all of the current thread is TransimittableThreadLocal and value of the registered ThreadLocal. * /
  @NonNull
  public static Object capture(a) {
    return new Snapshot(captureTtlValues(), captureThreadLocalValues());
  }
	
    / * * * capture TransimittableThreadLocal value, the holder of all value added to return after a HashMap. * /
  private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
    HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = 
      new HashMap<TransmittableThreadLocal<Object>, Object>();
    for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
      ttl2Value.put(threadLocal, threadLocal.copyValue());
    }
    return ttl2Value;
  }

  /** * captures the value of the registered ThreadLocal, that is, the ThreadLocal of the original thread, which can be registered with the TTL and passed when * passes thread pool local variables. * /
  private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
    final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = 
      new HashMap<ThreadLocal<Object>, Object>();
    for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry:threadLocalHolder.entrySet()){
      final ThreadLocal<Object> threadLocal = entry.getKey();
      final TtlCopier<Object> copier = entry.getValue();
      threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
    }
    return threadLocal2Value;
  }

  /** * The captured local variable replaces the child thread's local variable, and returns the child thread's existing copy of the local variable backup. * Used to restore a copy of a local variable after executing the run/call method. * /
  @NonNull
  public static Object replay(@NonNull Object captured) {
    final Snapshot capturedSnapshot = (Snapshot) captured;
    return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), 
                        replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
  }
	
  /** * replaces the TransmittableThreadLocal */
  @NonNull
  private static HashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) {
    // Create a backup copy
    HashMap<TransmittableThreadLocal<Object>, Object> backup = 
      new HashMap<TransmittableThreadLocal<Object>, Object>();

    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
      TransmittableThreadLocal<Object> threadLocal = iterator.next();
      // Make a copy of the local variable of the current thread
      backup.put(threadLocal, threadLocal.get());

      // If a thread variable does not exist in the calling thread but does exist in the thread pool, the corresponding local variable in the thread pool is deleted
      if (!captured.containsKey(threadLocal)) {
        iterator.remove();
        threadLocal.superRemove();
      }
    }
    // Pass the captured TTL value into the thread TTL obtained from the thread pool.
    setTtlValuesTo(captured);
    // is an extension point that calls the beforeExecute method of TTL. The default implementation is null
    doExecuteCallback(true);
    return backup;
  }

  private static HashMap<ThreadLocal<Object>, Object> replayThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> captured) {
    final HashMap<ThreadLocal<Object>, Object> backup = 
      new HashMap<ThreadLocal<Object>, Object>();
    for (Map.Entry<ThreadLocal<Object>, Object> entry : captured.entrySet()) {
      final ThreadLocal<Object> threadLocal = entry.getKey();
      backup.put(threadLocal, threadLocal.get());
      final Object value = entry.getValue();
      if (value == threadLocalClearMark) threadLocal.remove();
      else threadLocal.set(value);
    }
    return backup;
  }

  /** * Clears all TTL and TL for single-thread and returns the backup of cleared chi */
  @NonNull
  public static Object clear(a) {
    final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = 
      new HashMap<TransmittableThreadLocal<Object>, Object>();

    final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = 
      new HashMap<ThreadLocal<Object>, Object>();
    for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry:threadLocalHolder.entrySet()){
      final ThreadLocal<Object> threadLocal = entry.getKey();
      threadLocal2Value.put(threadLocal, threadLocalClearMark);
    }
    return replay(new Snapshot(ttl2Value, threadLocal2Value));
  }

  /** * restore */
  public static void restore(@NonNull Object backup) {
    final Snapshot backupSnapshot = (Snapshot) backup;
    restoreTtlValues(backupSnapshot.ttl2Value);
    restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
  }

  private static void restoreTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) {
    // Extension point, call afterExecute of TTL
    doExecuteCallback(false);

    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
      TransmittableThreadLocal<Object> threadLocal = iterator.next();

      if (!backup.containsKey(threadLocal)) {
        iterator.remove();
        threadLocal.superRemove();
      }
    }

    // Restore local variables to the backup version
    setTtlValuesTo(backup);
  }

  private static void setTtlValuesTo(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {
    for(Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) { TransmittableThreadLocal<Object> threadLocal = entry.getKey(); threadLocal.set(entry.getValue()); }}private static void restoreThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> backup) {
    for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) {
      finalThreadLocal<Object> threadLocal = entry.getKey(); threadLocal.set(entry.getValue()); }}/** * Snapshot class, save TTL and TL */
  private static class Snapshot {
    final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value;
    final HashMap<ThreadLocal<Object>, Object> threadLocal2Value;

    private Snapshot(HashMap
       
        , Object> ttl2Value, HashMap
        
         , Object> threadLocal2Value)
        
        {
      this.ttl2Value = ttl2Value;
      this.threadLocal2Value = threadLocal2Value; }}Copy the code

Enter the TtlCallable#call() method.

@Override
public V call(a) throws Exception {
  Object captured = capturedRef.get();
  if (captured == null|| releaseTtlValueReferenceAfterCall && ! capturedRef.compareAndSet(captured,null)) {
    throw new IllegalStateException("TTL value reference is released after call!");
  }
  // Call the replay method to pass the captured local variables of the current thread to the local variables of the thread pool.
  // Get a copy of the local variable before the thread pool thread overwrites it.
  Object backup = replay(captured);
  try {
    // Thread method call
    return callable.call();
  } finally {
    // Restore with a copy.restore(backup); }}Copy the code

The core code to pass local variables in a thread pool is pretty much done. To create a TtlCallable object, call the capture() method to capture the caller’s local thread variables. When call() is executed, the captured thread variables are replaced with the local variables of the corresponding fetched threads in the thread pool, and when the call is complete, the local variables are restored to before the call.