找回密码
 立即注册
首页 业界区 业界 从零开始实现简易版Netty(五) MyNetty FastThreadLocal ...

从零开始实现简易版Netty(五) MyNetty FastThreadLocal实现

坟菊 昨天 19:51
从零开始实现简易版Netty(五) MyNetty FastThreadLocal实现

1. ThreadLocal介绍

在上一篇博客中,lab4版本的MyNetty对事件循环中的IO写事件处理进行了优化,解决了之前版本无法进行大数据消息写出的问题。
按照计划,本篇博客中,lab5版本的MyNetty需要实现FastThreadLocal。由于本文属于系列博客,读者需要对之前的博客内容有所了解才能更好地理解本文内容。

  • lab1版本博客:从零开始实现简易版Netty(一) MyNetty Reactor模式
  • lab2版本博客:从零开始实现简易版Netty(二) MyNetty pipeline流水线
  • lab3版本博客:从零开始实现简易版Netty(三) MyNetty 高效的数据读取实现
  • lab4版本博客:从零开始实现简易版Netty(四) MyNetty 高效的数据写出实现


在实现FastThreadLocal之前,我们先介绍一下java中的ThreadLocal。对ThreadLocal的工作原理和优缺点有所了解后,才能更好的去理解netty为什么要额外实现一个功能类似的FastThreadLocal。
ThreadLocal,顾名思义是用来存储线程本地变量的一个容器(叫ThreadLocalVariable更为贴切)。虽然形式上是对同一个变量进行操作,但底层每个线程都持有一个独属于本线程的变量副本,不同线程对ThreadLocal变量的增删改查操作都是彼此完全隔离的。


下面的demo程序中,t1和t2两个线程虽然对static的threadLocal变量进行了并发的set操作,但彼此之间所设置的值却并不会串,t1线程get到的永远是t1-set,t2线程get到的永远是t2-set。
  1. public class ThreadLocalDemo {
  2.     private static ThreadLocal<String> threadLocal = new ThreadLocal<>();
  3.    
  4.     public static void main(String[] args) {
  5.         new Thread(() -> {
  6.             doSleep();
  7.             for(int i=0; i<100; i++){
  8.                 //设置t1线程中本地变量的值
  9.                 threadLocal.set("t1-set");
  10.                 //获取t1线程中本地变量的值
  11.                 System.out.println("t1线程局部变量的value : " + threadLocal.get() + " " + Thread.currentThread().getName());
  12.             }}, "t1").start();
  13.         new Thread(() -> {
  14.             doSleep();
  15.             for(int i=0; i<100; i++){
  16.                 //设置t2线程中本地变量的值
  17.                 threadLocal.set("t2=set");
  18.                 //获取t1线程中本地变量的值
  19.                 System.out.println("t2线程局部变量的value : " + threadLocal.get()  + " " + Thread.currentThread().getName());
  20.             }}, "t2").start();
  21.         LockSupport.park();
  22.     }
  23.     private static void doSleep(){
  24.         try {
  25.             Thread.sleep(1000L);
  26.         } catch (InterruptedException e) {
  27.             throw new RuntimeException(e);
  28.         }
  29.     }
  30. }
复制代码
threadLocal示意图

1.png



结合MyJdkThreadLocal的源码和threadLocal示意图,有几个关键点需要注意。

  • 对于相同的ThreadLocal对象,其作为key在不同的Thread专属的ThreadLocalMap中指向的并不是同一个value。通过这个设计ThreadLocal做到了线程本地变量的线程间隔离。
  • ThreadLocal可以通过withInitial方法或自定义子类对象的方式设置默认值,在未提前set时,get操作会返回默认值,并将ThreadLocal设置为默认值。
  • ThreadLocalMap中底层数组中存储的Entry节点对象有两个关键的成员变量,一个是作为key的ThreadLocal类型的对象,在构造方法中被设置为弱引用(WeakReference);另一个则是作为value的ThreadLocal实际维护的变量值,是普通的强引用。
  • 在set、get发现为null的slot时或者remove成功清理掉对应Entry时,ThreadLocalMap会启发式的对周围的slot进行检查,将Entry中key为null的Entry对象从底层数组table中移除。(这么做的具体原因在下文分析)
为什么Entry中要将对ThreadLocal变量的引用设置为弱引用(weakReference)?

假设一个场景,用户在方法中定义并使用了一个ThreadLocal的临时变量,方法中set了一个值后没有remove就返回了。方法返回后,ThreadLocal对象的直接引用无法再被访问,此时该ThreadLocal设置的value值便会留在当前线程的ThreadLocalMap中无法被主动删除。
当然,jdk的作者可以要求用户在完成对ThreadLocal的使用后必须正确的通过remove方法将其释放,否则用户自行承担内存泄露的风险(特别是在线程池等线程长期存活的场景下)。
但正如java作为一个拥有自动垃圾回收特性的语言一样,jdk的作者希望ThreadLocal变量能够和普通的临时变量一样,在函数返回时,临时变量引用的对象能够自动的被系统回收掉,而无需用户必须紧绷着神经小心翼翼的进行手动释放。
弱引用简单介绍

java中共设计了四种引用类型,按照引用强度的从高到低分别为强引用、软引用、弱引用和虚引用。


引用类型详情强引用 StrongReference任一强引用存在时,对象不会被gc回收。正常编码时的对象引用都是强引用软引用 SoftReference仅存在软引用以及强度更弱的引用时,当内存不足时便会对象便会被gc回收弱引用 WeakReference仅存在弱引用以及强度更多的应用时,当gc时对象便会被回收掉虚引用 PhantomReference仅存在虚引用时,等价于没有引用弱引用使用demo
  1. public interface ThreadLocalApi<T> {
  2.     void set(T value);
  3.     T get();
  4.     void remove();
  5. }
复制代码
  1. public class MySimpleThreadLocal<T> implements ThreadLocalApi<T> {
  2.     private final Map<Thread,T> threadLocalMap = new ConcurrentHashMap<>();
  3.     public void set(T value){
  4.         threadLocalMap.put(Thread.currentThread(),value);
  5.     }
  6.     public T get(){
  7.         return threadLocalMap.get(Thread.currentThread());
  8.     }
  9.     public void remove(){
  10.         threadLocalMap.remove(Thread.currentThread());
  11.     }
  12. }
复制代码
  1. public class MyJdkThread extends Thread {
  2.     private MyJdkThreadLocalMap myJdkThreadLocalMap;
  3.     public MyJdkThread(Runnable target) {
  4.         super(target);
  5.     }
  6.     public MyJdkThreadLocalMap getMyJdkThreadLocalMap() {
  7.         return myJdkThreadLocalMap;
  8.     }
  9.     public void setMyJdkThreadLocalMap(MyJdkThreadLocalMap myJdkThreadLocalMap) {
  10.         this.myJdkThreadLocalMap = myJdkThreadLocalMap;
  11.     }
  12. }
复制代码
  1. /**
  2. * 基本参考自jdk中的ThreadLocal类
  3. * */
  4. public class MyJdkThreadLocal<T> implements ThreadLocalApi<T> {
  5.     private final int threadLocalHashCode = generateNextHashCode();
  6.     private static final AtomicInteger nextHashCode = new AtomicInteger();
  7.     /**
  8.      * 对于二次幂扩容的map,冲突率最低的魔数
  9.      * */
  10.     private static final int HASH_INCREMENT = 0x61c88647;
  11.     /**
  12.      * 初始化时的值,默认是null
  13.      *
  14.      * 可以通过子类重写方法的方式自定义initialValue的返回值
  15.      * */
  16.     protected T initialValue() {
  17.         return null;
  18.     }
  19.     public static <S> MyJdkThreadLocal<S> withInitial(Supplier<? extends S> supplier) {
  20.         return new MyJdkThreadLocal<S>(){
  21.             @Override
  22.             protected S initialValue() {
  23.                 return supplier.get();
  24.             }
  25.         };
  26.     }
  27.     public int getThreadLocalHashCode() {
  28.         return threadLocalHashCode;
  29.     }
  30.     @Override
  31.     public T get() {
  32.         Thread t = Thread.currentThread();
  33.         if(!(t instanceof MyJdkThread)){
  34.             // 简单起见,只支持MyJdkThread,不对其它类型的thread做兼容
  35.             throw new IllegalStateException("Not a MyJdkThread");
  36.         }
  37.         MyJdkThread myJdkThread = (MyJdkThread) t;
  38.         MyJdkThreadLocalMap myJdkThreadLocalMap = myJdkThread.getMyJdkThreadLocalMap();
  39.         if (myJdkThreadLocalMap != null) {
  40.             // 如果ThreadLocalMap存在,直接尝试获取当前的threadLocal对应的entry
  41.             MyJdkThreadLocalMap.Entry e = myJdkThreadLocalMap.getEntry(this);
  42.             if (e != null) {
  43.                 // 当前threadLocal在对应的thread的threadLocalMap中存在,则直接返回value值
  44.                 @SuppressWarnings("unchecked")
  45.                 T result = (T)e.value;
  46.                 return result;
  47.             }
  48.         }
  49.         // 走到这里有两种情况
  50.         // 1. myJdkThreadLocalMap == null, 当前thread没有初始化ThreadLocalMap
  51.         // 2. myJdkThreadLocalMap != null && threadLocalMap.getEntry == null
  52.         //    当前thread存在threadLocalMap,但是里面不存在当前threadLocal对应的entry
  53.         return setInitialValue(myJdkThread);
  54.     }
  55.     @Override
  56.     public void set(T value) {
  57.         Thread t = Thread.currentThread();
  58.         if(!(t instanceof MyJdkThread)){
  59.             // 简单起见,只支持MyJdkThread,不对其它类型的thread做兼容
  60.             throw new IllegalStateException("Not a MyJdkThread");
  61.         }
  62.         MyJdkThread myJdkThread = (MyJdkThread) t;
  63.         MyJdkThreadLocalMap myJdkThreadLocalMap = myJdkThread.getMyJdkThreadLocalMap();
  64.         if (myJdkThreadLocalMap == null) {
  65.             // set时当前thread还没有threadLocalMao,创建一个新的
  66.             myJdkThread.setMyJdkThreadLocalMap(new MyJdkThreadLocalMap());
  67.         }
  68.         // 将value值set进当前线程的threadLocalMap中
  69.         myJdkThread.getMyJdkThreadLocalMap().set(this,value);
  70.     }
  71.     @Override
  72.     public void remove() {
  73.         Thread t = Thread.currentThread();
  74.         if(!(t instanceof MyJdkThread)){
  75.             // 简单起见,只支持MyJdkThread,不对其它类型的thread做兼容
  76.             throw new IllegalStateException("Not a MyJdkThread");
  77.         }
  78.         MyJdkThread myJdkThread = (MyJdkThread) t;
  79.         MyJdkThreadLocalMap myJdkThreadLocalMap = myJdkThread.getMyJdkThreadLocalMap();
  80.         if (myJdkThreadLocalMap != null) {
  81.             myJdkThreadLocalMap.remove(this);
  82.         }
  83.     }
  84.     private T setInitialValue(MyJdkThread myJdkThread) {
  85.         T value = initialValue();
  86.         if (myJdkThread.getMyJdkThreadLocalMap() == null) {
  87.             // threadLocalMap是惰性加载的,按需创建(因为不是所有的thread都需要用到threadLocal,这样可以节约内存)
  88.             MyJdkThreadLocalMap myJdkThreadLocalMap = new MyJdkThreadLocalMap();
  89.             myJdkThread.setMyJdkThreadLocalMap(myJdkThreadLocalMap);
  90.         }
  91.         // 将将当前的threadLocal变量的初始化的值设置进去
  92.         myJdkThread.getMyJdkThreadLocalMap().set(this,value);
  93.         return value;
  94.     }
  95.     private static int generateNextHashCode() {
  96.         return nextHashCode.getAndAdd(HASH_INCREMENT);
  97.     }
  98. }
复制代码



  • 从上面的弱引用介绍中可以看出,当对象只存在弱引用时,gc时便会将其自动的回收掉。基于弱引用,ThreadLocal便能实现启发式的自动删除未被正确remove掉的ThreadLocal变量。
  • 结合上面的ThreadLocal示意图可以看到,正常情况下,一个ThreadLocal变量存在直接的强引用和来自ThreadLocalMap中Entry节点的弱引用。当用户不再使用对应的ThreadLocal对象,并且没有正确的remove时,threadLocal变量便只剩下一个弱引用了。
    当gc后,Entry所指向的ThreadLocal变量便会被成功的回收掉,对应Entry的get方法会返回null值。基于此,便能感知到该ThreadLocal虽然没有被用户remove掉,但实际上不会再被使用,可以将其value移除或者将该slot给其它ThreadLocal变量使用。
  • 通过Entry中对ThreadLocal的弱引用设计和启发式的检查机制,ThreadLocal比较巧妙的解决(大幅缓解)了用户在没有正确remove回收ThreadLocal变量时的内存泄露问题。
有了启发式的清除机制能完全避免内存泄露吗?

再深入的思考下,弱引用加启发式的清理机制能完全的避免内存泄露吗?答案是否定的。
从ThreadLocalMap的源码中可以看出,出于执行效率的考虑(保证get、set和remove方法O(1)的时间复杂度),启发式的检查仅仅会确认相邻的少部分slot,并不能保证能完全的清理掉全部的key为null的Entry。
在绝大多数场景中,ThreadLocal即使没有全部正确的remove掉,也不会有特别严重的问题。但在极端场景下,线程长时间存活且线程数较多,而ThreadLocal所维护对象又较大时,未被完全清理的ThreadLocal变量依然会由于未被及时回收而浪费大量内存。
因此,作为使用者,还是推荐在使用ThreadLocal时尽可能的编写完备的逻辑,在确定不再使用时将ThreadLocal对象尽早remove释放掉,而不要过分依赖jdk提供的自动回收功能。
为什么ThreadLocalMap采用开发地址法而不是拉链法解决哈希冲突?

个人认为,主要原因有二:

  • 开发地址法相比拉链法因为不需要维护链表节点,因此更加节约空间。但在哈希表很大时,hash冲突时寻找可用slot的cpu开销较大。
    但相比普通HashMap的使用场景,ThreadLocalMap中会维护的Entry数量不会特别多,所以带来的额外的哈希冲突时的开销不是什么大问题。
  • 比起依赖链表的拉链法,基于弱引用进行Entry自动回收的机制能更简单、高效的实现。
2. FastThreadLocal实现原理介绍

在第一节中,详细的分析了jdk的ThreadLocal的工作原理,似乎ThreadLocal已经足够高效和可靠了。那么netty为什么还要再造一个功能类似的FastThreadLocal呢?特别是还冠以了Fast的名字,FastThreadLocal究竟比ThreadLocal快在哪?
FastThreadLocal比ThreadLocal快在哪?


  • ThreadLocal中key是基于ThreadLocal对象的hash码得出的,hash码本质上是随机的,因此不同ThreadLocal对象其对应的slot插槽必然有概率相同而可能产生hash冲突。
    而Netty中的FastThreadLocal的key并不是基于hash码,而是一个并发安全且全局单调递增的整数。每一个ThreadLocal对象被创建时都会基于这个自增整数(nextIndex)获得一个全局唯一的整数值(index),并作为在FastThreadLocalMap中底层数组的下标索引值。
    因此FastThreadLocal不会出现hash冲突的问题,比起jdk的ThreadLocal大幅减少了冲突时遍历寻找空闲插槽的额外开销。
  • 上文提到,jdk的作者在设计ThreadLocal时,考虑到用户无法总是正确的remove回收掉ThreadLocal,因此实现了一套启发式的自动清理无效Entry的机制。
    而Netty中,创造FastThreadLocal的核心目的是提升netty自身使用ThreadLocal的场景时的效率。netty的作者能保证FastThreadLocal一定能在不使用时被remove掉,不存在内存泄露的风险,不需要额外的自动清理机制来兜底。
    相比ThreadLocal,减少了启发式检查自动回收机制的FastThreadLocal毫无疑问在get、set和remove时性能有较大提升。
MyFastThreadLocal实现源码(完全参考netty,但做了一定的简化)

[code]public class MyFastThreadLocal implements ThreadLocalApi {    private static final int variablesToRemoveIndex = 0;    /**     * threadLocal对象在线程对应的ThreadLocalMap的下标(构造函数中,对象初始化的时候就确定了)     * */    private final int index;    public MyFastThreadLocal() {        // 原子性自增,确保每一个FastThreadLocal对象都有独一无二的下标        index = MyFastThreadLocalMap.nextVariableIndex();    }    /**     * 删除与当前线程绑定的所有ThreadLocal对象     * */    @SuppressWarnings("unchecked")    public static void removeAll() {        // 获得与当前Thread绑定的ThreadLocalMap        MyFastThreadLocalMap threadLocalMap = MyFastThreadLocalMap.getIfSet();        if (threadLocalMap == null) {            // 没有初始化过,无事发生            return;        }        try {            Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);            if (v != null && v != MyFastThreadLocalMap.UNSET) {                // ThreadLocalMap数组中下标为0的地方,固定放线程所属的全体FastThreadLocal集合                Set[] variablesToRemoveArray = variablesToRemove.toArray(new MyFastThreadLocal[0]);                for (MyFastThreadLocal tlv: variablesToRemoveArray) {                    tlv.remove(threadLocalMap);                }            }        } finally {            MyFastThreadLocalMap.remove();        }    }    @Override    public void remove() {        MyFastThreadLocalMap myFastThreadLocalMap = MyFastThreadLocalMap.getIfSet();        remove(myFastThreadLocalMap);    }    @SuppressWarnings("unchecked")    private void remove(MyFastThreadLocalMap threadLocalMap) {        if (threadLocalMap == null) {            return;        }        // 从ThreadLocalMap中删除        Object v = threadLocalMap.removeIndexedVariable(index);        removeFromVariablesToRemove(threadLocalMap, this);        if (v != MyFastThreadLocalMap.UNSET) {            try {                // threadLocal被删除时,供业务在子类中自定义的回调函数                onRemoval((V) v);            } catch (Exception e) {                throw new RuntimeException(e);            }        }    }    @SuppressWarnings("unchecked")    @Override    public final V get() {        // 获得当前线程所对应的threadLocalMap        MyFastThreadLocalMap threadLocalMap = MyFastThreadLocalMap.get();        // 基于index,以O(1)的效率精确的获得对应的threadLocalMap中的元素        Object v = threadLocalMap.indexedVariable(index);        if (v != MyFastThreadLocalMap.UNSET) {            // 不为null,返回            return (V) v;        }        // 为null,返回初始化的值        return initialize(threadLocalMap);    }    @Override    public final void set(V value) {        if (value != MyFastThreadLocalMap.UNSET) {            // 正常set值            MyFastThreadLocalMap threadLocalMap = MyFastThreadLocalMap.get();            if (threadLocalMap.setIndexedVariable(index, value)) {                // 如果之前的值是UNSET,把这个新的threadLocal加入到总的待删除集合中去                addToVariablesToRemove(threadLocalMap, this);            }        } else {            // 传的是UNSET逻辑上等于remove            remove(MyFastThreadLocalMap.getIfSet());        }    }    private V initialize(MyFastThreadLocalMap threadLocalMap) {        // 获得默认初始化的值(与jdk的ThreadLocal一样,可以通过子类来重写initialValue)        V v = initialValue();        threadLocalMap.setIndexedVariable(index, v);        addToVariablesToRemove(threadLocalMap, this);        return v;    }    protected V initialValue(){        return null;    }    @SuppressWarnings("unchecked")    private static void removeFromVariablesToRemove(MyFastThreadLocalMap threadLocalMap, MyFastThreadLocal variable) {        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);        if (v == MyFastThreadLocalMap.UNSET || v == null) {            return;        }        // 从FastThreadLocalMap数组起始处的Set中也移除掉FastThreadLocal变量        Set> variablesToRemove;        if (v == MyFastThreadLocalMap.UNSET || v == null) {            // 为null还未初始化,创建一个集合然后放到variablesToRemoveIndex位置上            variablesToRemove = Collections.newSetFromMap(new IdentityHashMap());            threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);        } else {            variablesToRemove = (Set
您需要登录后才可以回帖 登录 | 立即注册