全链路追踪必备组件之 TransmittableThreadLocal 详解

作者 | 姜日游

杏仁 Java 工程师。今日持续在线的程序玩家。

我们都知道 ThreadLocal 作为一种多线程处理手段,将数据限制在当前线程中,避免多线程情况下出现错误。

一般的使用场景大多会是服务上下文、分布式日志跟踪。

但是在业务代码中,为了提高响应速度,将多个复杂、长时间的计算或调用过程异步进行,让主线程可以先进行其他操作。像我们项目中最常用的就是  CompletableFuture  了,默认会使用预设的  ForkJoin ThreadPool  执行。

这也就引入了一个问题,如果保证 ThreadLocal 的信息能够传递异步线程?通过 ThreadLocal?通过线程池?通过 Runnable 或者 Callable?

有些场景丢了就丢了,比如目前我们的服务上下文传递,一般都没有很严谨的处理 ......

但是,如果是分布式追踪的场景,丢了就要累惨了。

注:以下代码仅保留关键代码,其余无关紧要则忽略

InheritableThreadLocal

InheritableThreadLocal  是 JDK 本身自带的一种线程传递解决方案。顾名思义,由当前线程创建的线程,将会继承当前线程里 ThreadLocal 保存的值。

其本质上是 ThreadLocal 的一个子类,通过覆写父类中创建初始化的相关方法来实现的。我们知道,ThreadLocal 实际上是 Thread 中保存的一个  ThreadLocalMap  类型的属性搭配使用才能让广大 Javaer 直呼真香的,所以 InheritableThreadLocal 也是如此。

public class Thread implements Runnable {
    // 如果单纯使用 ThreadLocal,则 Thread 使用该属性值保存 ThreadLocalMap
    ThreadLocal.ThreadLocalMap threadLocals = null;
        // 否则使用该属性值
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
  
    private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc) {
          Thread parent = currentThread();

          if (parent.inheritableThreadLocals != null)
              this.inheritableThreadLocals =
                  ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
      }
}

init  方法作为 Thread 初始化的核心方法,相关 ThreadLocal 代码已经全部摘出。如我们所见,仅仅就只是这一点改动。在创建线程时,如果当前线程的 inheritableThreadLocals 不为空,则根据它创建出新的 InheritableThreadLocals 保存到新线程中。

Ps : ThreadLocal 作为老牌选手,默认都是使用时,直接初始化 Thread 的 threadLocals 属性。

只有像是 InheritableThreadLocal 这样的后辈,需要特殊处理一下。

public class InheritableThreadLocal<T> extends ThreadLocal<T> {
    
    protected T childValue(T parentValue) {
        return parentValue;
    }
  
    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }

   // Thread 中 ThreadLocalMap 不存在时的初始化动作,需要改为初始化 inheritableThreadLocals
    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}

因此,原先 ThreadLocal 会从 Thread 的 threadLocals 获取 Map,那么 InheritableThreadLocal 就要从 inheritableThreadLocals 拿了。 childValue  方法用作从父线程中获取值,可以看到,这边是直接返回的,如果是复杂对象,就直接传引用了。当然,继承覆写该方法,可以实现浅拷贝、深拷贝等等方式。

缺点

这样的方式解决了创建线程时的 ThreadLocal 传值的问题,但不可能一直创建新的线程,那实在耗费资源。因此通用做法是线程复用,比如线程池呗。但是,递交异步任务是相应的 ThreadLocal 的值就无法传递过去了。

我们希望的是,异步线程执行任务的所使用的 ThreadLocal 值,是将任务提交给线程时主线程持有的。即从任务创建时传递到任务执行时。

想想,如果我们在创建异步任务时,在任务代码外获取当前线程的值临时保存,再传递给执行线程,在真正的任务执行前保存到当前线程即可。对,确实可以,但是麻烦不?每个创建异步任务的地方都要写。

那就把它封装到递交任务的方法中。

RunnableWrapper & CallableWrapper

假设按照服务上下文的场景举例,目前项目中的执行异步操作的方案是定义一个  AsyncExecutor ,并声明执行 Supplier 返回 CompletableFuture 的方法。

既然这样就可以对方法做一些改造,保证上下文的传递。

private static ThreadLocal<String> contextHolder = new ThreadLocal<>();

public static <T> CompletableFuture<T> invokeToCompletableFuture(Supplier<T> supplier, String errorMessage) {
    // 第一步
    String context = contextHolder.get();
    Supplier<T> newSupplier = () -> {
         // 第二步
        String origin = contextHolder.get();
        try {
            contextHolder.set(context);
            // 第三步
            return supplier.get();
        } finally {
            // 第四步
            contextHolder.set(origin);
            log.info(origin);
        }
    };
    return CompletableFuture.supplyAsync(newSupplier).exceptionally(e -> {
        throw new ServerErrorException(errorMessage, e);
    });
}
// test code
public static void main(String[] args) throws ExecutionException, InterruptedException {
    contextHolder.set("main");
    log.info(contextHolder.get());
    CompletableFuture<String> context = invokeToCompletableFuture(() -> test.contextHolder.get(), "error");
    log.info(context.get());
}

总得来说,就是在将异步任务派发给线程池时,对其做一下上下文传递的处理。

第一步:主线程获取上下文,传递给任务暂存。

1 之后的操作都将是异步执行线程操作的。

第二步:异步执行线程将原有上下文取出,暂时保存。并将主线程传递过来的上下文设置。

第三步:执行异步任务

第四步:将原有上下文设置回去。

可以看到一般并不会在异步线程执行完任务之后直接进行  remove 。而是一开始取出原上下文(可能为 NULL,也可能是线程创建时 InheritableThreadLocal 继承过来的值。当然后续也会被清除的),并在任务执行结束重新放回。这样的方式可以说是异步 ThreadLocal 传递的标准范式(大佬说的)。

这样子既起到了显式清除主线程带来的上下文,也避免了如果线程池的拒绝策略为  CallerRunsPolicy ,后续处理时上下文丢失的问题。

Supplier 不算是典型例子,更为典型的应该是 Runnable 和 Callable。不过举一推三,都是修饰一下,再丢给线程池。

public final class DelegatingContextRunnable implements Runnable {

    private final Runnable delegate;

    private final Optional<String> delegateContext;

    public DelegatingContextRunnable(Runnable delegate,
                                       Optional<String> context) {
        assert delegate != null;
        assert context != null;

        this.delegate = delegate;
        this.delegateContext = context;
    }

    public DelegatingContextRunnable(Runnable delegate) {
        // 修饰原有的任务,并保存当前线程的值
        this(delegate, ContextHolder.get());
    }

    public void run() {
        Optional<String> originalContext = ContextHolder.get();

        try {
            ContextHolder.set(delegateContext);
            delegate.run();
        } finally {
            ContextHolder.set(originalContext);
        }
    }
}

public final void execute(Runnable task) {
  // 递交给真正的执行线程池前,对任务进行修饰
  executor.execute(wrap(task));
}

protected final Runnable wrap(Runnable task) {
  return new DelegatingContextRunnable(task);
}

后续,使用线程池执行异步任务的时候,事先对任务进行封装代理即可。

不过,还是比较麻烦。自定义的线程池,需要显式处理任务。而且更严谨的做法,不同业务场景之间的线程池应该是隔离的,以免受到影响,就比如  Hystrix  的线程池。

每一个线程池都要处理就麻烦了。所以换个思路,代理线程池。

DelegaingExecutor

这个就不多说了,实际很简单,就照搬我们上下文相关类库。

public class DelegatingContextExecutor implements Executor  {

    private final Executor delegate;


    public DelegatingContextExecutor(Executor delegateExecutor) {
        this.delegate = delegateExecutor;
    }

    public final void execute(Runnable task) {
        delegate.execute(wrap(task));
    }

    protected final Runnable wrap(Runnable task) {
        return new DelegatingContextRunnable(task);
    }

    protected final Executor getDelegateExecutor() {
        return delegate;
    }
}
// 自定义的线程池,用于执行项目中的异步任务
public Executor queryExecutor() {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor();
    // 封装服务上下文的线程池修饰
    return new DelegatingContextExecutorService(threadPoolExecutor);
}

问题似乎都解决了,那还有什么?

对,适用场景不够通用。上面的做法只针对于指定的 ThreadLocal,其他场景例如链路追踪、应用容器或上层框架跨应用代码给下层  SDK  传递信息(像是契约包  Feign  的执行线程)。

那么  TransmittableThreadLocal  就是为了解决通用化场景而设计的。

TransmittableThreadLocal

作为一个核心代码不超过一千行的工具框架,实际使用和架构设计都十分简单。

其使用方法本质上与上述提到的 CallableWrapper 和 DelegatingExecutor 是一样的,并且为了方便使用,对外提供了静态工厂方法或工具类。

public final void execute(Runnable task) {
  executor.execute(TtlCallable.get(task));
}
// 或者
public Executor queryExecutor() {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor();
    // 封装服务上下文的线程池修饰
    return TtlExecutors.getTtlExecutorService(threadPoolExecutor);
}

当然,前提是 ThreadLocal 必须使用 TransmittableThreadLocal。至于为什么,我们源码分析时再细细说来。

先看看核心实现类的结构,以 Callable 和 ExecutorService 为例。

整体主要是三个部分:任务( TtlCallable )、线程池( ExecutorServiceTtlWrapper )、ThreadLocal( TransmittableThreadLocal )。其实对应上述讲到的 CallableWrapper、DelegatingExecutor、InheritableThreadLocal。

但是无论是任务和线程池,本身还是依赖于 TransmittableThreadLocal 对于存储值的管理。

用官方的时序图直观展示一下,框架是如何起作用的:

可以看到,从第三步创建完任务,第四步修饰完任务,后续大部分过程都依赖于 TransmittableThreadLocal 或 TransmittableThreadLocal 中声明的静态工具类  Transmitter 。Transmitter 主要负责 ThreadLocal 的管理和值的传递。

首先看看 TtlCallable。

TtlCallable

该类实际上是 JDK Callable 的一个修饰。类比于,上文讲到的 RunnableWrapper,只是为了临时保存父线程 ThreadLocal 的值,以便在执行任务之前,赋值到子线程中。

因此,TtlCallable 和 TtlExecutorService 都实现了 TtlWrapper 接口。也许你以为,该接口是实现修饰的语义,但是它只提供了一个方法,表达了拆修饰的语义:

public interface TtlWrapper<T> extends TtlEnhanced {
    @NonNull
    T unwrap();
}

毕竟核心是修饰,所以该类主要为了提供修饰的核心抽象,便于框架对其进行判断和管理。

该方法语义要求,必须返回修饰的源对象或下层对象(毕竟可能修饰了很多层),因此也是空值安全的。null 进来,null 出去。

public final class TtlCallable<V> implements Callable<V>, TtlWrapper<Callable<V>>, TtlEnhanced, TtlAttachments {
    // 保存父线程的 ThreadLocal 快照
    private final AtomicReference<Object> capturedRef;
    // 实际执行任务
    private final Callable<V> callable;
    // 判断是否执行完,清除任务所保存的 ThreadLocal 快照
    private final boolean releaseTtlValueReferenceAfterCall;

    private TtlCallable(@NonNull Callable<V> callable, boolean releaseTtlValueReferenceAfterCall) {
        // 1.创建时, 从 Transmitter 抓取快照
        this.capturedRef = new AtomicReference<Object>(capture());
        this.callable = callable;
        this.releaseTtlValueReferenceAfterCall = releaseTtlValueReferenceAfterCall;
    }

    @Override
    public V call() throws Exception {
        Object captured = capturedRef.get();
        // 如果 releaseTtlValueReferenceAfterCall 为 true,则在执行线程取出快照后清除。
        if (captured == null || releaseTtlValueReferenceAfterCall && !capturedRef.compareAndSet(captured, null)) {
            throw new IllegalStateException("TTL value reference is released after call!");
        }
                // 2.使用 Transmitter 将快照重做到当前执行线程,并将原来的值取出
        Object backup = replay(captured);
        try {
            // 3.执行任务
            return callable.call();
        } finally {
            // 4.Transmitter 重新将原值放回执行线程
            restore(backup);
        }
    }
}

可以看到,从实例化到任务执行的顺序,和上文讲到的 CallableWrapper 是完全一致的。但是在其之上,提供了更为完整的特性和线程安全性。

  • releaseTtlValueReferenceAfterCall  的可控,保证了任务执行完,依然被业务代码持有的场景下,避免 ThreadLocal 快照继续持有而造成的内存泄漏。毕竟,对于业务方来说,这个东西是我不关心的,无需跟随任务本身的生命周期。

  • 快照使用  AtomicReference  保存,保证任务误重用下,清除快照动作的多线程安全性。

上面两者的合用,相当于期望一个任务只能被执行一次,尽量避免任务重用和继续持有。

任务重用的间隔之间,可能出现 ThreadLocal 值被修改的情况,那么后一次任务执行时,快照实际是不准确的。业务场景应该尽量避免这种情况出现才对。

该类提供了静态工厂方法,方便业务方创建。

public static <T> TtlCallable<T> get(@Nullable Callable<T> callable) {
    return get(callable, false);
}

@Nullable
public static <T> TtlCallable<T> get(@Nullable Callable<T> callable, boolean releaseTtlValueReferenceAfterCall) {
    return get(callable, releaseTtlValueReferenceAfterCall, false);
}

@Nullable
public static <T> TtlCallable<T> get(@Nullable Callable<T> callable, boolean releaseTtlValueReferenceAfterCall, boolean idempotent) {
    if (null == callable) return null;

    if (callable instanceof TtlEnhanced) {
        // avoid redundant decoration, and ensure idempotency
        if (idempotent) return (TtlCallable<T>) callable;
        else throw new IllegalStateException("Already TtlCallable!");
    }
    return new TtlCallable<T>(callable, releaseTtlValueReferenceAfterCall);
}

可以看到,默认工厂方法的  releaseTtlValueReferenceAfterCall  是 false。如果想要使用执行完清除,就要注意方法的使用。

其次,这里还有一个幂等的参数控制:  idempotent  。如果传入的 Callable 已经是修饰过的,那么根据 idempotent 的值,要么返回原 Callable,要么报错。

我觉得这里有个两难的点。

我们调用静态工厂方法期望得到的是调用该方法时 ThreadLocal 的快照。所以理论上,应该无论传入什么 Callable,都应该返回一个保存当前本地线程值快照的 TtlCallable。

但是,如果这样的逻辑下,传入的是已修饰的类,那么最后结果就是在任务执行时,会造成外层修饰的快照被内层修饰的覆盖。实际使用的是之前保存的快照了。

因此默认情况就只能  FastFail

官方并不建议设置 idempotent 为 true,因为直接返回原修饰类,本身也就违反静态工厂方法的语义。所以官方建议: <b>DO NOT</b> set, only when you know why.

ExecutorServiceTtlWrapper

该类并不需要多讲,本身与上文的 DelegatingExecutor 一样。

class ExecutorServiceTtlWrapper extends ExecutorTtlWrapper implements ExecutorService, TtlEnhanced {
    private final ExecutorService executorService;

    ExecutorServiceTtlWrapper(@NonNull ExecutorService executorService) {
        super(executorService);
        this.executorService = executorService;
    }

    @NonNull
    @Override
    public <T> Future<T> submit(@NonNull Callable<T> task) {
        return executorService.submit(TtlCallable.get(task));
    }
}

其余方法都是一样的做法。

从上文看到,实际 ThreadLocal 的线程传递的核心在于 TransmittableThreadLocal 和 Transmitter。

TransmittableThreadLocal

TransmittableThreadLocal 只继承了 InheritableThreadLocal 和实现了该框架提供的函数接口 TtlCopier。

因此 TransmittableThreadLocal 自身是一个 InheritableThreadLocal,同样具备了线程创建时传递的特性。

其次,从类体系上看,TransmittableThreadLocal 自身是比较简单的,本质上只是为了让框架能够进行线程传递,做了一些小动作而已。

可以看到提供的方法是十分少的,源码行数总共也才不超过200行。

首先说一下构造函数。

private final boolean disableIgnoreNullValueSemantics;

public TransmittableThreadLocal() {
    this(false);
}

public TransmittableThreadLocal(boolean disableIgnoreNullValueSemantics) {
    this.disableIgnoreNullValueSemantics = disableIgnoreNullValueSemantics;
}

一共两个构造函数,有参构造函数允许设置 “是否禁用忽略空值语义”。默认是开启的,表现行为是如果是 null 值,那么 TransmittableThreadLocal 是不会传递这个值,并且如果 set null,同时执行 remove 操作。表达的意思就是,“我不要 null,不归我管。你敢给我,我就再也不管你了“。

这样设计可能是因为一开始设计服务于业务,是希望业务不要通过 NULL 来表达任何含义,同时避免 NPE 和优化 GC。但是后来官方考虑到作为一个基础服务框架,应该尽量保证完整的语义。毕竟这样的特性是 JDK 的 ThreadLocal 不兼容的。因此后来,官方为了保证兼容性,加了控制参数,允许禁用该特性。

TtlCopier

TransmittableThreadLocal 实现了一个类,TtlCopier。顾名思义,该类定义了线程传递时,值复制的抽象语义。

public interface TtlCopier<T> {
    T copy(T parentValue);
}

而 TransmittableThreadLocal 的默认实现是与 InheritableThreadLocal 相同的,返回值的引用。

public T copy(T parentValue) {
    return parentValue;
}

同时,该接口也为业务方留下了扩展点。开发者可以重写该方法,来定义线程传递时,如何进行值的复制。

TransmittableThreadLocal 内部维护了一个非常关键的属性,用来注册项目中维护的 TransmittableThreadLocal,从而保证 Transmitter 去正确传递 ThreadLocal 的值。

private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
        new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
            @Override
            protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
                return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
            }

            @Override
            protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
                return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
            }
        };

holder 是一个 InheritableThreadLocal,用来保存所有注册的 TransmittableThreadLocal。父子线程传递时,可以直接将父线程的注册表传递过来。使用 InheritableThreadLocal,主要保证了嵌套线程场景下,注册表的正确传递。官方有个 issue 以及为其 fix 的 release 版本,从 ThreadLocal 改成了 InheritableThreadLocal。嵌入Thread调用的bug

其次,存储的是  WeakHashMap ,value 都是无意义的 null,并且永远不会被使用。这样一来,保证项目使用 TransmittableThreadLocal 的话,不会引入新的内存泄漏问题。其内存泄漏的可能风险,就只完全来自于 InheritableThreadLocal 本身。

@Override
public final T get() {
    T value = super.get();
    if (disableIgnoreNullValueSemantics || null != value) addThisToHolder();
    return value;
}

@Override
public final void set(T value) {
    if (!disableIgnoreNullValueSemantics && null == value) {
        // may set null to remove value
        remove();
    } else {
        super.set(value);
        addThisToHolder();
    }
}

@Override
public final void remove() {
    removeThisFromHolder();
    super.remove();
}

@SuppressWarnings("unchecked")
private void addThisToHolder() {
    if (!holder.get().containsKey(this)) {
        holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value.
    }
}

private void removeThisFromHolder() {
    holder.get().remove(this);
}

get & set 会将当前的 TransmittableThreadLocal 注册到 holder 中, remove 时,会删除对应注册。

可以看到,前文说到的  disableIgnoreNullValueSemantics  的值在 get 和 set 时使用到。默认为 false 时,ThreadLocal 不会保存 null,holder 不会注册对应的 TransmittableThreadLocal。

TransmittableThreadLocal 就这样没了,可以看到就很简单。但是,线程传递的内容呢,为什么没有?

这是因为,TransmittableThreadLocal 将线程传递的所有工作全部委托给了其静态内部类 Transmitter。

Transmitter

我们讲到 TransmittableThreadLocal 会将有值的对象,注册到 holder 中,以便 Transmitter 去知道传递哪一些实例的值。但是如果这样,那不是都要修改代码,将项目中的 ThreadLocal 都改掉吗?

这当然不可能,因此 Transmitter 承担了这个任务,允许业务代码将原有的 ThreadLocal 注册进来,以方便 Transmitter 来识别和传递。

// 注册 ThreadLocal 的 threadLocalHolder 依然是 WeakHashMap
private static volatile WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> threadLocalHolder = new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>();
// ThreadLocal 手动注册时用的锁
private static final Object threadLocalHolderUpdateLock = new Object();
// 标记 ThreadLocal 的值已清除,类似于设置一个 null
private static final Object threadLocalClearMark = new Object();
// 传递 TtlCopier,来确定 threadLocal 传递值的方式。默认是 引用传递,与 TransmittableThreadLocal 的 copy 一致。
public static <T> boolean registerThreadLocal(@NonNull ThreadLocal<T> threadLocal, @NonNull TtlCopier<T> copier) {
    return registerThreadLocal(threadLocal, copier, false);
}

@SuppressWarnings("unchecked")
public static <T> boolean registerThreadLocalWithShadowCopier(@NonNull ThreadLocal<T> threadLocal) {
    // 默认是内部定义个 shadowCopier
    return registerThreadLocal(threadLocal, (TtlCopier<T>) shadowCopier, false);
}

public static <T> boolean registerThreadLocal(@NonNull ThreadLocal<T> threadLocal, @NonNull TtlCopier<T> copier, boolean force) {
    // 如果是 TransmittableThreadLocal,则没有必要再维护了。默认就实现了其的传递。
    if (threadLocal instanceof TransmittableThreadLocal) {
        logger.warning("register a TransmittableThreadLocal instance, this is unnecessary!");
        return true;
    }
        
    synchronized (threadLocalHolderUpdateLock) {
        // force 为 false,则不会更新对应的 copier
        if (!force && threadLocalHolder.containsKey(threadLocal)) return false;
                // copy on write
        WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> newHolder = new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>(threadLocalHolder);
        newHolder.put((ThreadLocal<Object>) threadLocal, (TtlCopier<Object>) copier);
        threadLocalHolder = newHolder;
        return true;
    }
}

public static <T> boolean registerThreadLocalWithShadowCopier(@NonNull ThreadLocal<T> threadLocal, boolean force) {
    return registerThreadLocal(threadLocal, (TtlCopier<T>) shadowCopier, force);
}
// 清除 ThreadLocal 的注册
public static <T> boolean unregisterThreadLocal(@NonNull ThreadLocal<T> threadLocal) {
    if (threadLocal instanceof TransmittableThreadLocal) {
        logger.warning("unregister a TransmittableThreadLocal instance, this is unnecessary!");
        return true;
    }

    synchronized (threadLocalHolderUpdateLock) {
        if (!threadLocalHolder.containsKey(threadLocal)) return false;

        WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> newHolder = new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>(threadLocalHolder);
        newHolder.remove(threadLocal);
        threadLocalHolder = newHolder;
        return true;
    }
}
// 默认实现的 TtlCopier,直接引用传递
private static final TtlCopier<Object> shadowCopier = new TtlCopier<Object>() {
    @Override
    public Object copy(Object parentValue) {
        return parentValue;
    }
};

其实我自己有个想不明白的,既然已经用了 threadLocalHolderUpdateLock 做锁,为什么还要用 copy on write?GC 友好?mark 一下。

剩下的部分,就是 Transmitter 怎么传递 ThreadLocal 的值了。

实际就是三个步骤,capture -> reply -> restore,crr。

1.抓取当前线程的值快照

// 快照类,用来保存当前线程的 TtlThreadLocal 和 ThreadLocal 的快照
private static class Snapshot {
    final WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value;
    final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value;

    private Snapshot(WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value, WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value) {
        this.ttl2Value = ttl2Value;
        this.threadLocal2Value = threadLocal2Value;
    }
}

public static Object capture() {
    // 抓取快照
    return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}
// 抓取 TransmittableThreadLocal 的快照
private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
    WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
    // 从 TransmittableThreadLocal 的 holder 中,遍历所有有值的 TransmittableThreadLocal,将 TransmittableThreadLocal 取出和值复制到 Map 中。
    for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
        ttl2Value.put(threadLocal, threadLocal.copyValue());
    }
    return ttl2Value;
}

//  抓取注册的 ThreadLocal。
private static WeakHashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
    final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value = new WeakHashMap<ThreadLocal<Object>, Object>();
    // 从 threadLocalHolder 中,遍历注册的 ThreadLocal,将 ThreadLocal 和 TtlCopier 取出,将值复制到 Map 中。
    for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) {
        final ThreadLocal<Object> threadLocal = entry.getKey();
        final TtlCopier<Object> copier = entry.getValue();

        threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
    }
    return threadLocal2Value;
}

2.将快照重做到执行线程

@NonNull
public static Object replay(@NonNull Object captured) {
    final Snapshot capturedSnapshot = (Snapshot) captured;
    return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}

// 重播 TransmittableThreadLocal,并保存执行线程的原值
@NonNull
private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) {
    WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
  
    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
        TransmittableThreadLocal<Object> threadLocal = iterator.next();

        // 遍历 holder,从 父线程继承过来的,或者之前注册进来的
        backup.put(threadLocal, threadLocal.get());

        // clear the TTL values that is not in captured
        // avoid the extra TTL values after replay when run task
        // 清除本次没有传递过来的 ThreadLocal,和对应值。毕竟一是可能会有因为 InheritableThreadLocal 而传递并保留的值。二来保证主线程 set 过的 ThreadLocal,不应该被传递过来。明确,其传递是由业务代码控制的,就是明确 set 过值的。
        if (!captured.containsKey(threadLocal)) {
            iterator.remove();
            threadLocal.superRemove();
        }
    }

    // 将 map 中的值,设置到 ThreadLocal 中。
    setTtlValuesTo(captured);

    // TransmittableThreadLocal 的回调方法,在任务执行前执行。
    doExecuteCallback(true);

    return backup;
}

private static void setTtlValuesTo(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {
    for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) {
        TransmittableThreadLocal<Object> threadLocal = entry.getKey();
        // set 的同时,也就将 TransmittableThreadLocal 注册到当前线程的注册表了。
        threadLocal.set(entry.getValue());
    }
}

private static WeakHashMap<ThreadLocal<Object>, Object> replayThreadLocalValues(@NonNull WeakHashMap<ThreadLocal<Object>, Object> captured) {
    final WeakHashMap<ThreadLocal<Object>, Object> backup = new WeakHashMap<ThreadLocal<Object>, Object>();

    for (Map.Entry<ThreadLocal<Object>, Object> entry : captured.entrySet()) {
        final ThreadLocal<Object> threadLocal = entry.getKey();
        backup.put(threadLocal, threadLocal.get());

        final Object value = entry.getValue();
        // 如果值是标记已删除,则清除
        if (value == threadLocalClearMark) threadLocal.remove();
        else threadLocal.set(value);
    }

    return backup;
}

doExecuteCallback 是 TransmittableThreadLocal 定义的回调方法,保证任务执行前和执行后的回调动作。

isBefore 控制是执行前还是执行后。

内部调用了 beforeExecute 和 afterExecute 方法。默认是不做任何动作。

private static void doExecuteCallback(boolean isBefore) {
    for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
        try {
            if (isBefore) threadLocal.beforeExecute();
            else threadLocal.afterExecute();
        } catch (Throwable t) {
            // 忽略所有异常,保证任务的执行
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "TTL exception when " + (isBefore ? "beforeExecute" : "afterExecute") + ", cause: " + t.toString(), t);
            }
        }
    }
}
protected void beforeExecute() {
}

protected void afterExecute() {
}

3.恢复备份的原快照

public static void restore(@NonNull Object backup) {
    final Snapshot backupSnapshot = (Snapshot) backup;
    restoreTtlValues(backupSnapshot.ttl2Value);
    restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
}

private static void restoreTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> backup) {
    // call afterExecute callback 任务执行完回调
    doExecuteCallback(false);

    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
        TransmittableThreadLocal<Object> threadLocal = iterator.next();

        // clear the TTL values that is not in backup
        // avoid the extra TTL values after restore
        // 恢复快照时,清除本次传递注册进来,但是原先不存在的 TransmittableThreadLocal
        if (!backup.containsKey(threadLocal)) {
            iterator.remove();
            threadLocal.superRemove();
        }
    }

    // restore TTL values
    // 恢复快照中的 value 到 TransmittableThreadLocal 中
    setTtlValuesTo(backup);
}

private static void setTtlValuesTo(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {
    for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) {
        TransmittableThreadLocal<Object> threadLocal = entry.getKey();
        threadLocal.set(entry.getValue());
    }
}

private static void restoreThreadLocalValues(@NonNull WeakHashMap<ThreadLocal<Object>, Object> backup) {
    for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) {
        final ThreadLocal<Object> threadLocal = entry.getKey();
        threadLocal.set(entry.getValue());
    }
}

对特殊场景以及 Lambda 的支持

Transmitter 定义了几个特殊场景下以及 Java 8 lambda 表达式的使用。

特殊场景就是指,执行前,清除当前执行线程 ThreadLocal 的值,包括 TtlThreadLocal 和注册 ThreadLocal 。

像一开始讲到的业务代码喜欢使用 Supplier,所以也对其做了支持。本质是为了简化工作。

不过,注意的是,快照的捕获则需要业务代码自己完成并传递。

public static <R> R runSupplierWithCaptured(@NonNull Object captured, @NonNull Supplier<R> bizLogic) {
    Object backup = replay(captured);
    try {
        return bizLogic.get();
    } finally {
        restore(backup);
    }
}

public static <R> R runSupplierWithClear(@NonNull Supplier<R> bizLogic) {
    Object backup = clear();
    try {
        return bizLogic.get();
    } finally {
        restore(backup);
    }
}

public static <R> R runCallableWithCaptured(@NonNull Object captured, @NonNull Callable<R> bizLogic) throws Exception {
    Object backup = replay(captured);
    try {
        return bizLogic.call();
    } finally {
        restore(backup);
    }
}

public static <R> R runCallableWithClear(@NonNull Callable<R> bizLogic) throws Exception {
    Object backup = clear();
    try {
        return bizLogic.call();
    } finally {
        restore(backup);
    }
}

简化方法,使用起来也就是:

// 线程A
Object captured = Transmitter.capture();

// 线程B
@Async
String result = runSupplierWithCaptured(captured, () -> {
  
     System.out.println("Hello");
     ...
     return "World";
});

否则只能按照全套流程了:

// 线程A
Object captured = Transmitter.capture();

// 线程B
@Async
String result = runSupplierWithCaptured(captured, () -> {
  
     System.out.println("Hello");
     ...
     return "World";
});
Object backup = Transmitter.replay(captured); // (2)
try {
    System.out.println("Hello");
    // ...
    return "World";
} finally {
    // restore the TransmittableThreadLocal of thread B when replay
    Transmitter.restore(backup); (3)

Clear

上面可以看到,一些方法是做了 clear 操作。

就是不依赖快照的捕获,将空值的快照信息,传递给重做方法执行,就能清除当前执行线程的值,并得到返回原值备份。

public static Object clear() {
    final WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();

    final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value = new WeakHashMap<ThreadLocal<Object>, Object>();
    for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) {
        final ThreadLocal<Object> threadLocal = entry.getKey();
        // threadLocalClearMark 标记为未被传递和注册,更为合适,从而避免和 null 混淆。否则无法区分原有就是 null,还是未被注册
        threadLocal2Value.put(threadLocal, threadLocalClearMark);
    }

    return replay(new Snapshot(ttl2Value, threadLocal2Value));
}

注意

如果注意到 TransmittableThreadLocal 是继承 InheritableThreadLocal,就应该知道,子线程创建时,值还是会被传递过去。这也就可能带来内存泄漏问题。

所以,同时提供 DisableInheritableThreadFactoryWrapper,以方便业务代码自定义线程池,禁止值的继承传递。

class DisableInheritableThreadFactoryWrapper implements DisableInheritableThreadFactory {
    private final ThreadFactory threadFactory;

    DisableInheritableThreadFactoryWrapper(@NonNull ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    @Override
    public Thread newThread(@NonNull Runnable r) {
        // 调用了 Transmitter 的 clear 方法,在创建子线程前,清除当前线程的值,并保存下来
        final Object backup = clear();
        try {
            return threadFactory.newThread(r);
        } finally {
            // 创建完,再重新恢复。以此,避免了值的继承传递。
            restore(backup);
        }
    }

    @NonNull
    @Override
    public ThreadFactory unwrap() {
        return threadFactory;
    }
}

对于 1.8 特性,还提供了 ForkJoinWorkerThreadFactory 和 TtlForkJoinPoolHelper 等类的支持。

Java Agent 支持

避免代码改动的话,可以使用 Java Agent,来隐式替换 JDK 的相应类。对于 1.8 的 CompletableFuture 和 Stream,在底层通过对 ForkJoinPool 的支持,也做了透明支持。

总结

到此,TransmittableThreadLocal 的源码解析就结束了。核心源码是不是很简单?但是某些思想和考量还是很值得学习的。

ThreadLocal 的使用,本身类似于全局变量,而且是可修改的。一旦中间过程被修改,就无法保证整体流程的前后一致性。它将是一个隐藏的强依赖,一个可能被忽略、意想不到的坑。(我不承认,我在还原大佬的话。)

应该尽量避免在业务代码中使用的。 DO NOT use, only when you know why .

嗯,还有加上一句,让其他人也明白,文档务必齐全。(说实话,我挺想用英文的,想想算了)。

全文完

以下文章您可能也会感兴趣:

我们正在招聘 Java 工程师,欢迎有兴趣的同学投递简历到 rd-hr@xingren.com 。

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章