This is the 20th day of my participation in the First Challenge 2022
The background,
We can use InheritableThreadlocoal to pass local variables, but if we use thread pools, we may not be able to pass local variables, because the existence of a thread in a thread pool is not created every time it is used. InheritableThreadlocal is copied only when intertableThreadLocals=true when the thread is initialized. Therefore, if the child thread used this time is a thread that has been pooled, the offline use from the thread pool has not gone through the initialization process, and the local variable copy of the parent thread will not be carried out.
In most daily application scenarios, thread pools are used to manage resources effectively. Ali is currently known to have an open source project to solve this problem ThansmittableThreadLocal.
Second, the introduction of
In the case of execution components such as thread pools that pool and reuse threads, the function of passing ThreadLocal values is provided to solve the problem of context passing during asynchronous execution.
The InheritableThreadLocal class in the JDK completes value passing from parent thread to child thread. But in the case of execution components that are pooled and reused by threads, such as thread pools, threads are created by thread pools and are pooled and reused; In this case, parent-child ThreadLocal value passing is meaningless, and the application actually needs to pass the ThreadLocal value when the task is submitted to the thread pool to the task execution time.
There are many other application scenarios with TransmittableThreadLocal.
Three, basic use
1. Introduce the TransimittableThreadLocal dependence
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.11.5</version>
</dependency>
Copy the code
2. Easy to use
public static void main(String[] args) throws InterruptedException {
// Create a thread pool
ExecutorService executorService = Executors.newSingleThreadExecutor();
/ / thread pool after TtlExecutors tools packaging, return ExecutorServiceTtlWrapper packaging class
executorService = TtlExecutors.getTtlExecutorService(executorService);
// Create local variables that need to be passed to the thread pool
TransmittableThreadLocal<String> username = new TransmittableThreadLocal<>();
// This is the first time that a thread is available in the pool. If you don't use TTL, use InheritableThreadLocal
// The parent thread's local variable.
username.set("zhangShang");
executorService.submit(new Runnable() {
@Override
public void run(a) { System.out.println(username.get()); }});// The second call, since it is using a single thread pool, is to reuse the thread created above, so it passes
// inheritableThreadLocal is not available with local variables.
Thread.sleep(3000);
username.set("liSi");
executorService.submit(new Runnable() {
@Override
public void run(a) { System.out.println(username.get()); }}); Thread.sleep(3000);
username.set("wangWu");
executorService.submit(new Runnable() {
@Override
public void run(a) { System.out.println(username.get()); }}); }Copy the code
The output is:
zhangShang
liSi
wangWu
Copy the code
Fourth, the principle of
TransimittableThreadLocal inheritance, by definition, in InheritableThreadLocal, and implement TtlCopier interface, it is only a copy method. So it’s basically an extension to InheritableThreadLocal.
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T>
Copy the code
Holder is added in the TransimittableThreadLocal properties. The effect of this property is that objects marked as thread passable are added to the object. An easy way to mark a class is to add a Type field to the class, and another way is to add objects of that Type to a static global collection. When used later, all values in the collection will have this flag.
// 1. The holder itself is an InheritableThreadLocal object
// 2. The value of this holder Object is WeakHashMap
,? >
// 2.1 WeekHashMap value is always null and cannot be used.
// 2.2 WeekHasshMap value=null
private staticInheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ? >> holder =newInheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ? > > () {@Override
protectedWeakHashMap<TransmittableThreadLocal<Object>, ? > initialValue() {return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
}
/** * overrides the childValue method to implement properties of the parent thread directly as local variable objects of the child thread. * /
@Override
protectedWeakHashMap<TransmittableThreadLocal<Object>, ? > childValue(WeakHashMap<TransmittableThreadLocal<Object>, ? > parentValue) {return newWeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue); }};Copy the code
The application code uses the TTlexecTrue tool to wrap the thread pool object. Utility class is simply judgment, whether the input thread pool has already been packaging, not empty check, etc., and then return a wrapper class ExecutorServiceTtlWrapper. There are different and wrapper classes for different thread pool types.
@Nullable
public static ExecutorService getTtlExecutorService(@Nullable ExecutorService executorService) {
if (TtlAgent.isTtlAgentLoaded() || executorService == null || executorService instanceof TtlEnhanced) {
return executorService;
}
return new ExecutorServiceTtlWrapper(executorService);
}
Copy the code
Enter the wrapper class ExecutorServiceTtlWrapper. You’ll notice whether through ExecutorServiceTtlWrapper# or submit method is ExecutorTtlWrapper# the execute method, will be packaged into TtlCallable or TtlRunnable thread object, Used to do some business logic before actually executing the run method.
/ * * * in the method to realize the submit ExecutorServiceTtlWrapper * /
@NonNull
@Override
public <T> Future<T> submit(@NonNull Callable<T> task) {
return executorService.submit(TtlCallable.get(task));
}
ExecutorTtlWrapper implement execute method */
@Override
public void execute(@NonNull Runnable command) {
executor.execute(TtlRunnable.get(command));
}
Copy the code
So, the core logic of focus should be in TtlCallable#call() or TtlRunnable#run(). The following uses TtlCallable as an example. TtlRunnable is similarly used. Before analyzing the Call () method, look at a class Transmitter
public static class Transmitter {
/ * * * to capture all of the current thread is TransimittableThreadLocal and value of the registered ThreadLocal. * /
@NonNull
public static Object capture(a) {
return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}
/ * * * capture TransimittableThreadLocal value, the holder of all value added to return after a HashMap. * /
private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value =
new HashMap<TransmittableThreadLocal<Object>, Object>();
for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
ttl2Value.put(threadLocal, threadLocal.copyValue());
}
return ttl2Value;
}
/** * captures the value of the registered ThreadLocal, that is, the ThreadLocal of the original thread, which can be registered with the TTL and passed when * passes thread pool local variables. * /
private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
final HashMap<ThreadLocal<Object>, Object> threadLocal2Value =
new HashMap<ThreadLocal<Object>, Object>();
for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry:threadLocalHolder.entrySet()){
final ThreadLocal<Object> threadLocal = entry.getKey();
final TtlCopier<Object> copier = entry.getValue();
threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
}
return threadLocal2Value;
}
/** * The captured local variable replaces the child thread's local variable, and returns the child thread's existing copy of the local variable backup. * Used to restore a copy of a local variable after executing the run/call method. * /
@NonNull
public static Object replay(@NonNull Object captured) {
final Snapshot capturedSnapshot = (Snapshot) captured;
return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value),
replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}
/** * replaces the TransmittableThreadLocal */
@NonNull
private static HashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) {
// Create a backup copy
HashMap<TransmittableThreadLocal<Object>, Object> backup =
new HashMap<TransmittableThreadLocal<Object>, Object>();
for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object> threadLocal = iterator.next();
// Make a copy of the local variable of the current thread
backup.put(threadLocal, threadLocal.get());
// If a thread variable does not exist in the calling thread but does exist in the thread pool, the corresponding local variable in the thread pool is deleted
if (!captured.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// Pass the captured TTL value into the thread TTL obtained from the thread pool.
setTtlValuesTo(captured);
// is an extension point that calls the beforeExecute method of TTL. The default implementation is null
doExecuteCallback(true);
return backup;
}
private static HashMap<ThreadLocal<Object>, Object> replayThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> captured) {
final HashMap<ThreadLocal<Object>, Object> backup =
new HashMap<ThreadLocal<Object>, Object>();
for (Map.Entry<ThreadLocal<Object>, Object> entry : captured.entrySet()) {
final ThreadLocal<Object> threadLocal = entry.getKey();
backup.put(threadLocal, threadLocal.get());
final Object value = entry.getValue();
if (value == threadLocalClearMark) threadLocal.remove();
else threadLocal.set(value);
}
return backup;
}
/** * Clears all TTL and TL for single-thread and returns the backup of cleared chi */
@NonNull
public static Object clear(a) {
final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value =
new HashMap<TransmittableThreadLocal<Object>, Object>();
final HashMap<ThreadLocal<Object>, Object> threadLocal2Value =
new HashMap<ThreadLocal<Object>, Object>();
for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry:threadLocalHolder.entrySet()){
final ThreadLocal<Object> threadLocal = entry.getKey();
threadLocal2Value.put(threadLocal, threadLocalClearMark);
}
return replay(new Snapshot(ttl2Value, threadLocal2Value));
}
/** * restore */
public static void restore(@NonNull Object backup) {
final Snapshot backupSnapshot = (Snapshot) backup;
restoreTtlValues(backupSnapshot.ttl2Value);
restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
}
private static void restoreTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) {
// Extension point, call afterExecute of TTL
doExecuteCallback(false);
for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object> threadLocal = iterator.next();
if (!backup.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// Restore local variables to the backup version
setTtlValuesTo(backup);
}
private static void setTtlValuesTo(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {
for(Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) { TransmittableThreadLocal<Object> threadLocal = entry.getKey(); threadLocal.set(entry.getValue()); }}private static void restoreThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> backup) {
for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) {
finalThreadLocal<Object> threadLocal = entry.getKey(); threadLocal.set(entry.getValue()); }}/** * Snapshot class, save TTL and TL */
private static class Snapshot {
final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value;
final HashMap<ThreadLocal<Object>, Object> threadLocal2Value;
private Snapshot(HashMap
, Object> ttl2Value, HashMap
, Object> threadLocal2Value)
{
this.ttl2Value = ttl2Value;
this.threadLocal2Value = threadLocal2Value; }}Copy the code
Enter the TtlCallable#call() method.
@Override
public V call(a) throws Exception {
Object captured = capturedRef.get();
if (captured == null|| releaseTtlValueReferenceAfterCall && ! capturedRef.compareAndSet(captured,null)) {
throw new IllegalStateException("TTL value reference is released after call!");
}
// Call the replay method to pass the captured local variables of the current thread to the local variables of the thread pool.
// Get a copy of the local variable before the thread pool thread overwrites it.
Object backup = replay(captured);
try {
// Thread method call
return callable.call();
} finally {
// Restore with a copy.restore(backup); }}Copy the code
The core code to pass local variables in a thread pool is pretty much done. To create a TtlCallable object, call the capture() method to capture the caller’s local thread variables. When call() is executed, the captured thread variables are replaced with the local variables of the corresponding fetched threads in the thread pool, and when the call is complete, the local variables are restored to before the call.