找回密码
 立即注册
首页 业界区 业界 Netty源码—9.性能优化和设计模式

Netty源码—9.性能优化和设计模式

祖娅曦 2025-6-3 13:33:48
大纲
1.Netty的两大性能优化工具
2.FastThreadLocal的实现之构造方法
3.FastThreadLocal的实现之get()方法
4.FastThreadLocal的实现之set()方法
5.FastThreadLocal的总结
6.Recycler的设计理念
7.Recycler的使用
8.Recycler的四个核心组件
9.Recycler的初始化
10.Recycler的对象获取
11.Recycler的对象回收
12.异线程收割对象
13.Recycler的总结
14.Netty设计模式之单例模式
15.Netty设计模式之策略模式
16.Netty设计模式之装饰器模式
17.Netty设计模式之观察者模式
18.Netty设计模式之迭代器模式
19.Netty设计模式之责任链模式
 
1.Netty的两大性能优化工具
(1)FastThreadLocal
(2)Recycler
 
(1)FastThreadLocal
FastThreadLocal的作用与ThreadLocal相当,但比ThreadLocal更快。ThreadLocal的作用是多线程访问同一变量时能够通过线程本地化的方式避免多线程竞争、实现线程隔离。
 
Netty的FastThreadLocal重新实现了JDK的ThreadLocal的功能,且访问速度更快,但前提是使用FastThreadLocalThread线程。
 
(2)Recycler
Recycler实现了一个轻量级的对象池机制。对象池的作用是一方面可以实现快速创建对象,另一方面可以避免反复创建对象、减少YGC压力。
 
Netty使用Recycler的方式来获取ByteBuf对象的原因是:ByteBuf对象的创建在Netty里是非常频繁的且又比较占空间。但是如果对象比较小,使用对象池也不是那么划算。
 
2.FastThreadLocal的实现之构造方法
Netty为FastThreadLocal量身打造了FastThreadLocalThread和InternalThreadLocalMap两个重要的类,FastThreadLocal的构造方法会设置一个由final修饰的int型变量index,该index变量的值会由InternalThreadLocalMap的静态方法通过原子类来进行获取。
  1. //A special variant of ThreadLocal that yields higher access performance when accessed from a FastThreadLocalThread.
  2. //Internally, a FastThreadLocal uses a constant index in an array, instead of using hash code and hash table, to look for a variable.  
  3. //Although seemingly very subtle, it yields slight performance advantage over using a hash table, and it is useful when accessed frequently.
  4. //To take advantage of this thread-local variable, your thread must be a FastThreadLocalThread or its subtype.
  5. //By default, all threads created by DefaultThreadFactory are FastThreadLocalThread due to this reason.
  6. //Note that the fast path is only possible on threads that extend FastThreadLocalThread,
  7. //because it requires a special field to store the necessary state.
  8. //An access by any other kind of thread falls back to a regular ThreadLocal.
  9. //@param <V> the type of the thread-local variable
  10. public class FastThreadLocal<V> {
  11.     //每个FastThreadLocal都有一个唯一的身份标识ID
  12.     private final int index;
  13.     //类初始化时调用,所以默认为variablesToRemoveIndex = 0
  14.     //第n个值存放在数组下标为n的位置,下标为0的位置存所有FastThreadLocal<V>
  15.     private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
  16.     public FastThreadLocal() {
  17.         index = InternalThreadLocalMap.nextVariableIndex();
  18.     }
  19.     ...
  20. }
  21. //The internal data structure that stores the thread-local variables for Netty and all FastThreadLocals.
  22. //Note that this class is for internal use only and is subject to change at any time.  
  23. //Use FastThreadLocal unless you know what you are doing.
  24. public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
  25.     public static final Object UNSET = new Object();
  26.     public static int nextVariableIndex() {
  27.         int index = nextIndex.getAndIncrement();
  28.         if (index < 0) {
  29.             nextIndex.decrementAndGet();
  30.             throw new IllegalStateException("too many thread-local indexed variables");
  31.         }
  32.         return index;
  33.     }
  34.     ...
  35. }
  36. //The internal data structure that stores the thread-local variables for Netty and all FastThreadLocals.
  37. //Note that this class is for internal use only and is subject to change at any time.  
  38. //Use FastThreadLocal unless you know what you are doing.
  39. class UnpaddedInternalThreadLocalMap {
  40.     static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
  41.     static final AtomicInteger nextIndex = new AtomicInteger();
  42.     //Used by FastThreadLocal
  43.     Object[] indexedVariables;
  44.     ...
  45. }
复制代码
 
3.FastThreadLocal的实现之get()方法
(1)FastThreadLocalThread的关键属性
(2)Thread与ThreadLocalMap
(3)FastThreadLocal.get()方法的实现流程
(4)从InternalThreadLocalMap中获取值
 
(1)FastThreadLocalThread的关键属性
FastThreadLocal继承了Thread类,每个FastThreadLocalThread线程对应一个InternalThreadLocalMap实例。只有FastThreadLocal和FastThreadLocalThread线程组合在一起使用的时候才能发挥出FastThreadLocal的性能优势。
  1. public class FastThreadLocalThread extends Thread {
  2.     private InternalThreadLocalMap threadLocalMap;
  3.     public FastThreadLocalThread() { }
  4.     public FastThreadLocalThread(Runnable target) {
  5.         super(target);
  6.     }
  7.     public FastThreadLocalThread(ThreadGroup group, Runnable target) {
  8.         super(group, target);
  9.     }
  10.     public FastThreadLocalThread(String name) {
  11.         super(name);
  12.     }
  13.     public FastThreadLocalThread(ThreadGroup group, String name) {
  14.         super(group, name);
  15.     }
  16.     public FastThreadLocalThread(Runnable target, String name) {
  17.         super(target, name);
  18.     }
  19.     public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) {
  20.         super(group, target, name);
  21.     }
  22.     public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) {
  23.         super(group, target, name, stackSize);
  24.     }
  25.    
  26.     //Returns the internal data structure that keeps the thread-local variables bound to this thread.
  27.     //Note that this method is for internal use only, and thus is subject to change at any time.
  28.     public final InternalThreadLocalMap threadLocalMap() {
  29.         return threadLocalMap;
  30.     }
  31.    
  32.     //Sets the internal data structure that keeps the thread-local variables bound to this thread.
  33.     //Note that this method is for internal use only, and thus is subject to change at any time.
  34.     public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
  35.         //这个方法会在调用InternalThreadLocalMap.get()方法时被调用
  36.         //具体就是通过fastGet()方法设置FastThreadLocalThread一个新创建的InternalThreadLocalMap对象
  37.         this.threadLocalMap = threadLocalMap;
  38.     }
  39. }
复制代码
(2)Thread与ThreadLocalMap
注意:每个Thread线程对应一个ThreadLocalMap实例。ThreadLocal.ThreadLocalMap是采用线性探测法来解决哈希冲突的。
  1. public class Thread implements Runnable {
  2.     ...
  3.     //ThreadLocal values pertaining to this thread. This map is maintained by the ThreadLocal class.
  4.     ThreadLocal.ThreadLocalMap threadLocals = null;
  5.     ...
  6. }
  7. public class ThreadLocal<T> {
  8.     ...
  9.     //Returns the value in the current thread's copy of this thread-local variable.  
  10.     //If the variable has no value for the current thread,
  11.     //it is first initialized to the value returned by an invocation of the #initialValue method.
  12.     //@return the current thread's value of this thread-local
  13.     public T get() {
  14.         Thread t = Thread.currentThread();
  15.         ThreadLocalMap map = getMap(t);
  16.         if (map != null) {
  17.             ThreadLocalMap.Entry e = map.getEntry(this);
  18.             if (e != null) {
  19.                 @SuppressWarnings("unchecked")
  20.                 T result = (T)e.value;
  21.                 return result;
  22.             }
  23.         }
  24.         return setInitialValue();
  25.     }
  26.    
  27.     //ThreadLocalMap is a customized hash map suitable only for maintaining thread local values.
  28.     //No operations are exported outside of the ThreadLocal class.
  29.     //The class is package private to allow declaration of fields in class Thread.  
  30.     //To help deal with very large and long-lived usages, the hash table entries use WeakReferences for keys.
  31.     //However, since reference queues are not used,
  32.     //stale entries are guaranteed to be removed only when the table starts running out of space.
  33.     static class ThreadLocalMap {
  34.         //The entries in this hash map extend WeakReference, using its main ref field as the key (which is always a ThreadLocal object).
  35.         //Note that null keys (i.e. entry.get() == null) mean that the key is no longer referenced, so the entry can be expunged from table.  
  36.         //Such entries are referred to as "stale entries" in the code that follows.
  37.         static class Entry extends WeakReference<ThreadLocal<?>> {
  38.             //The value associated with this ThreadLocal.
  39.             Object value;
  40.             Entry(ThreadLocal<?> k, Object v) {
  41.                 super(k);
  42.                 value = v;
  43.             }
  44.         }
  45.         //The initial capacity -- MUST be a power of two.
  46.         private static final int INITIAL_CAPACITY = 16;
  47.       
  48.         //The table, resized as necessary.
  49.         //table.length MUST always be a power of two.
  50.         private Entry[] table;
  51.         //The number of entries in the table.
  52.         private int size = 0;
  53.         //The next size value at which to resize.
  54.         private int threshold; // Default to 0
  55.         ...
  56.     }
  57.     ...
  58. }
复制代码
(4)从InternalThreadLocalMap中获取值
InternalThreadLocalMap.get()方法会分别通过fastGet()和slowGet()来获取一个InternalThreadLocalMap对象。
 
如果当前线程是普通线程则调用slowGet()方法,让每个线程通过JDK的ThreadLocal来拿到一个InternalThreadLocalMap对象,所以如果普通线程使用FastThreadLocal其实和普通线程使用ThreadLocal是一样的。
 
如果当前线程是FastThreadLocalThread线程则调用fastGet()方法,由于FastThreadLocalThread线程维护了一个InternalThreadLocalMap对象,所以fastGet()方法相当于直接从线程对象里把这个InternalThreadLocalMap拿出来。
 
注意:Reactor线程所创建的线程实体便是FastThreadLocalThread线程。
  1. //A special variant of ThreadLocal that yields higher access performance when accessed from a FastThreadLocalThread.
  2. //Internally, a FastThreadLocal uses a constant index in an array, instead of using hash code and hash table, to look for a variable.  
  3. //Although seemingly very subtle, it yields slight performance advantage over using a hash table, and it is useful when accessed frequently.
  4. //To take advantage of this thread-local variable, your thread must be a FastThreadLocalThread or its subtype.
  5. //By default, all threads created by DefaultThreadFactory are FastThreadLocalThread due to this reason.
  6. //Note that the fast path is only possible on threads that extend FastThreadLocalThread,
  7. //because it requires a special field to store the necessary state.
  8. //An access by any other kind of thread falls back to a regular ThreadLocal.
  9. //@param <V> the type of the thread-local variable
  10. public class FastThreadLocal<V> {
  11.     //每个FastThreadLocal都有一个唯一的身份标识ID
  12.     //每个FastThreadLocal对应的V值存储在当前FastThreadLocalThread线程维护的InternalThreadLocalMap的下标为index的位置
  13.     private final int index;
  14.    
  15.     //类初始化时调用,所以默认为variablesToRemoveIndex = 0
  16.     //第n个值存放在数组下标为n的位置,下标为0的位置会存储所有FastThreadLocal<V>
  17.     private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
  18.     public FastThreadLocal() {
  19.         //每new一个FastThreadLocal,index就会自增1,所以index是FastThreadLocal的唯一身份ID
  20.         index = InternalThreadLocalMap.nextVariableIndex();
  21.     }
  22.    
  23.     //Returns the current value for the current thread
  24.     @SuppressWarnings("unchecked")
  25.     public final V get() {
  26.         //首先获取由当前FastThreadLocalThread线程维护的InternalThreadLocalMap
  27.         InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
  28.         //从数组中取出index位置的元素
  29.         Object v = threadLocalMap.indexedVariable(index);
  30.         //如果获取到的元素不是一个UNSET即一个new Object(),则返回该元素
  31.         if (v != InternalThreadLocalMap.UNSET) {
  32.             return (V) v;
  33.         }
  34.         //如果获取到的数组元素是缺省对象,则对threadLocalMap在index位置的元素值执行初始化操作
  35.         return initialize(threadLocalMap);
  36.     }
  37.    
  38.     private V initialize(InternalThreadLocalMap threadLocalMap) {
  39.         V v = null;
  40.         try {
  41.             //通过initialValue()方法对threadLocalMap在index位置的元素值进行初始化
  42.             //initialValue()方法可以被FastThreadLocal<V>的子类重写
  43.             v = initialValue();
  44.         } catch (Exception e) {
  45.             PlatformDependent.throwException(e);
  46.         }
  47.         //设置threadLocalMap数组在下标index处的元素值
  48.         threadLocalMap.setIndexedVariable(index, v);
  49.         addToVariablesToRemove(threadLocalMap, this);
  50.         return v;
  51.     }
  52.    
  53.     //Returns the initial value for this thread-local variable.
  54.     protected V initialValue() throws Exception {
  55.         return null;
  56.     }
  57.     private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
  58.         //获取threadLocalMap数组下标为0的元素
  59.         Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
  60.         Set<FastThreadLocal<?>> variablesToRemove;
  61.       
  62.         //将variable添加到数组下标为0位置的Set集合中,以便可以通过remove()方法统一删除
  63.         if (v == InternalThreadLocalMap.UNSET || v == null) {
  64.             //创建FastThreadLocal类型的Set集合
  65.             variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
  66.             //将variablesToRemove这个Set集合设置到数组下标为0的位置
  67.             threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
  68.         } else {
  69.             //强转获得Set集合
  70.             variablesToRemove = (Set<FastThreadLocal<?>>) v;
  71.         }
  72.         variablesToRemove.add(variable);
  73.     }
  74.     ...
  75. }
复制代码
InternalThreadLocalMap的indexedVariable()方法中的入参index索引,指的是每一个FastThreadLocal对象在JVM里的标识ID,通过自增的方式由原子类进行创建。
 
对于当前线程维护的InternalThreadLocalMap对象里的数组indexedVariables,可以通过下标方式indexedVariables[index]获取一个Object。
 
初始化InternalThreadLocalMap对象时,会初始化一个32个元素的indexedVariables数组,每一个元素都是UNSET值。
 
4.FastThreadLocal的实现之set()方法
(1)FastThreadLocal.set()方法的实现流程
(2)向InternalThreadLocalMap设置值
(3)InternalThreadLocalMap的扩容原理
 
(1)FastThreadLocal.set()方法的实现流程
首先获取由当前FastThreadLocalThread线程维护的InternalThreadLocalMap。然后通过唯一标识当前FastThreadLocal对象的索引index,给InternalThreadLocalMap数组中对应的位置设置值。接着将当前FastThreadLocal对象保存到待清理的Set中,如果设置的值是一个缺省的Object对象,则通过remove()方法删除InternalThreadLocalMap数组中对应位置的元素。
  1. //The internal data structure that stores the thread-local variables for Netty and all FastThreadLocals.
  2. //Note that this class is for internal use only and is subject to change at any time.  
  3. //Use FastThreadLocal unless you know what you are doing.
  4. public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
  5.     public static final Object UNSET = new Object();
  6.     private static final int INDEXED_VARIABLE_TABLE_INITIAL_SIZE = 32;
  7.     ...
  8.     private InternalThreadLocalMap() {
  9.         //设置父类的成员变量indexedVariables的初始值
  10.         super(newIndexedVariableTable());
  11.     }
  12.     private static Object[] newIndexedVariableTable() {
  13.         //初始化一个32个元素的数组
  14.         Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE];
  15.         //每个元素都是UNSET值
  16.         Arrays.fill(array, UNSET);
  17.         return array;
  18.     }
  19.    
  20.     //index是当前访问的FastThreadLocal在JVM里的索引
  21.     //indexedVariables数组是当前线程维护的InternalThreadLocalMap对象在初始化时创建的
  22.     public Object indexedVariable(int index) {
  23.         Object[] lookup = indexedVariables;
  24.         //直接通过索引来取出对象
  25.         return index < lookup.length? lookup[index] : UNSET;
  26.     }
  27.    
  28.     public static InternalThreadLocalMap get() {
  29.         Thread thread = Thread.currentThread();
  30.         if (thread instanceof FastThreadLocalThread) {
  31.             return fastGet((FastThreadLocalThread) thread);
  32.         } else {
  33.             return slowGet();
  34.         }
  35.     }
  36.     private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
  37.         InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
  38.         if (threadLocalMap == null) {
  39.             thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
  40.         }
  41.         return threadLocalMap;
  42.     }
  43.     private static InternalThreadLocalMap slowGet() {
  44.         //如果普通线程使用FastThreadLocal其实和普通线程使用ThreadLocal是一样的
  45.         //因为此时返回的是一个通过ThreadLocal维护的InternalThreadLocalMap对象
  46.         ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
  47.         InternalThreadLocalMap ret = slowThreadLocalMap.get();
  48.         if (ret == null) {
  49.             ret = new InternalThreadLocalMap();
  50.             slowThreadLocalMap.set(ret);
  51.         }
  52.         return ret;
  53.     }
  54.     ...
  55. }
  56. class UnpaddedInternalThreadLocalMap {
  57.     static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
  58.     static final AtomicInteger nextIndex = new AtomicInteger();
  59.     //Used by FastThreadLocal
  60.     Object[] indexedVariables;
  61.     ...
  62.     UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
  63.         this.indexedVariables = indexedVariables;
  64.     }
  65.     ...
  66. }
复制代码
Stack的结构如下图示,其中Thread A是同线程,Thread B、Thread C、Thread D是异线程。
1.png
(3)第二个核心组件是WeakOrderQueue
WeakOrderQueue的作用是用于存储其他线程(异线程)回收由当前线程所分配的对象,并且在合适的时机,Stack会从异线程的WeakOrderQueue中收割对象。
  1. public class FastThreadLocal<V> {
  2.     //每个FastThreadLocal都有一个唯一的身份标识ID
  3.     //每个FastThreadLocal对应的V值存储在当前FastThreadLocalThread线程维护的InternalThreadLocalMap的下标为index的位置
  4.     private final int index;
  5.    
  6.     //类初始化时调用,所以默认为variablesToRemoveIndex = 0
  7.     //第n个值存放在数组下标为n的位置,下标为0的位置会存储所有FastThreadLocal<V>
  8.     private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
  9.     ...
  10.    
  11.     //Set the value for the current thread.
  12.     public final void set(V value) {
  13.         if (value != InternalThreadLocalMap.UNSET) {
  14.             InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
  15.             setKnownNotUnset(threadLocalMap, value);
  16.         } else {
  17.             remove();
  18.         }
  19.     }
  20.    
  21.     private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
  22.         //将当前FastThreadLocal对象对应的数据添加到当前线程维护的InternalThreadLocalMap中
  23.         if (threadLocalMap.setIndexedVariable(index, value)) {
  24.             //将当前FastThreadLocal对象保存到待清理的Set中
  25.             addToVariablesToRemove(threadLocalMap, this);
  26.         }
  27.     }
  28.    
  29.     private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
  30.         //获取threadLocalMap数组下标为0的元素
  31.         Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
  32.         Set<FastThreadLocal<?>> variablesToRemove;
  33.       
  34.         //将variable添加到数组下标为0位置的Set集合中,以便可以通过remove()方法统一删除
  35.         if (v == InternalThreadLocalMap.UNSET || v == null) {
  36.             //创建FastThreadLocal类型的Set集合
  37.             variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
  38.             //将variablesToRemove这个Set集合设置到数组下标为0的位置
  39.             threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
  40.         } else {
  41.             //强转获得Set集合
  42.             variablesToRemove = (Set<FastThreadLocal<?>>) v;
  43.         }
  44.         variablesToRemove.add(variable);
  45.     }
  46.    
  47.     //Sets the value to uninitialized;
  48.     //a proceeding call to get() will trigger a call to initialValue().
  49.     public final void remove() {
  50.         remove(InternalThreadLocalMap.getIfSet());
  51.     }
  52.     //Sets the value to uninitialized for the specified thread local map;
  53.     //a proceeding call to get() will trigger a call to initialValue().
  54.     //The specified thread local map must be for the current thread.
  55.     @SuppressWarnings("unchecked")
  56.     public final void remove(InternalThreadLocalMap threadLocalMap) {
  57.         if (threadLocalMap == null) {
  58.             return;
  59.         }
  60.       
  61.         //删除数组下标index位置对应的value
  62.         Object v = threadLocalMap.removeIndexedVariable(index);
  63.       
  64.         //从数组下标0的位置取出Set集合,删除当前FastThreadLocal对象
  65.         removeFromVariablesToRemove(threadLocalMap, this);
  66.         if (v != InternalThreadLocalMap.UNSET) {
  67.             try {
  68.                 //和initValue()方法一样,可以被FastThreadLocal的子类重写
  69.                 onRemoval((V) v);
  70.             } catch (Exception e) {
  71.                 PlatformDependent.throwException(e);
  72.             }
  73.         }
  74.     }
  75.    
  76.     //Returns the initial value for this thread-local variable.
  77.     protected V initialValue() throws Exception {
  78.         return null;
  79.     }
  80.     //Invoked when this thread local variable is removed by #remove().
  81.     //Be aware that #remove() is not guaranteed to be called when the `Thread` completes which means
  82.     //you can not depend on this for cleanup of the resources in the case of `Thread` completion.
  83.     protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception {
  84.       
  85.     }
  86.     ...
  87. }
复制代码
(4)第三个核心组件是Link
每个WeakOrderQueue中都包含一个Link链表。回收对象都会被存放在Link链表的结点上,每个Link结点默认存储16个对象。当每个Link结点存储满了,会创建新的Link结点并放入链表尾部。
 
(5)第四个核心组件是DefaultHandle
一个对象在发起回收时需要调用DefaultHandle的recycle()方法进行具体的回收处理,这个要被回收的对象会保存在DefaultHandle中。
 
Stack和WeakOrderQueue都使用DefaultHandle来存储被回收的对象。在Stack和Link中都有一个elements数组,该数组保存的就是一个个的DefaultHandle实例。
  1. //The internal data structure that stores the thread-local variables for Netty and all FastThreadLocals.
  2. //Note that this class is for internal use only and is subject to change at any time.  
  3. //Use FastThreadLocal unless you know what you are doing.
  4. public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
  5.     public static final Object UNSET = new Object();
  6.     ...
  7.     //index是当前访问的FastThreadLocal在JVM里的索引
  8.     //indexedVariables数组是当前线程维护的InternalThreadLocalMap对象在初始化时创建的
  9.     public Object indexedVariable(int index) {
  10.         Object[] lookup = indexedVariables;
  11.         //直接通过索引来取出对象
  12.         return index < lookup.length? lookup[index] : UNSET;
  13.     }
  14.    
  15.     //如果设置的是新值,则返回true;如果设置的是旧值,则返回false;
  16.     public boolean setIndexedVariable(int index, Object value) {
  17.         Object[] lookup = indexedVariables;
  18.         if (index < lookup.length) {
  19.             Object oldValue = lookup[index];
  20.             //直接将数组index位置的元素设置为value,时间复杂度为O(1)
  21.             lookup[index] = value;
  22.             return oldValue == UNSET;
  23.         } else {
  24.             //扩容数组
  25.             expandIndexedVariableTableAndSet(index, value);
  26.             return true;
  27.         }
  28.     }
  29.    
  30.     //通过无符号右移和位或运算实现2^n * 2,这与HashMap的扩容原理是一样的
  31.     private void expandIndexedVariableTableAndSet(int index, Object value) {
  32.         Object[] oldArray = indexedVariables;
  33.         final int oldCapacity = oldArray.length;
  34.         int newCapacity = index;//假设index = 16,也就是1000
  35.         newCapacity |= newCapacity >>>  1;//变为1100
  36.         newCapacity |= newCapacity >>>  2;//变为1111
  37.         newCapacity |= newCapacity >>>  4;//还是1111
  38.         newCapacity |= newCapacity >>>  8;//还是1111
  39.         newCapacity |= newCapacity >>> 16;//还是1111
  40.         newCapacity ++;//变为10000
  41.         Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
  42.         Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
  43.         newArray[index] = value;
  44.         indexedVariables = newArray;
  45.     }
  46.    
  47.     //根据不同的Thread返回InternalThreadLocalMap
  48.     public static InternalThreadLocalMap getIfSet() {
  49.         Thread thread = Thread.currentThread();
  50.         if (thread instanceof FastThreadLocalThread) {
  51.             return ((FastThreadLocalThread) thread).threadLocalMap();
  52.         }
  53.         return slowThreadLocalMap.get();
  54.     }
  55.     ...
  56. }
复制代码
9.Recycler的初始化
创建Recycler的方法是直接new一个Recycler。每个Recycler里都有一个threadLocal变量,即FastThreadLocal。所以每个Recycler里对于每个线程都会有一个Stack对象。调用Recycler的get()方法获取T类对象时,便会初始化一个Stack对象。
  1. public class RecycleTest {
  2.     private static final Recycler<User> RECYCLER = new Recycler<User>() {
  3.         protected User newObject(Handle<User> handle) {
  4.             return new User(handle);
  5.         }
  6.     };
  7.    
  8.     private static class User {
  9.         //创建Recycler.Handle<User>对象与User对象进行绑定
  10.         private final Recycler.Handle<User> handle;
  11.         public User(Recycler.Handle<User> handle) {
  12.             this.handle = handle;
  13.         }
  14.         public void recycle() {
  15.             handle.recycle(this);
  16.         }
  17.     }
  18.    
  19.     public static void main(String[] args) {
  20.         //1.从对象池中获取User对象
  21.         User user1 = RECYCLER.get();
  22.         //2.回收对象到对象池
  23.         user1.recycle();
  24.         //3.从对象池中获取对象
  25.         User user2 = RECYCLER.get();
  26.         System.out.println(user1 == user2);
  27.     }
  28. }
复制代码
(2)同线程回收对象
在当前线程创建对象,也在当前线程进行回收。
 
Stack的pushNow()方法的主要逻辑是:回收每8个对象中的一个,以及将回收的对象存放到Stack.elements数组时可能需要的扩容处理。
 
(3)异线程回收对象
在当前线程创建对象,但在其他线程进行回收。
 
说明一:
Stack.pushLater()方法中的DELAYED_RECYCLED是一个FastThreadLocal类型的对象,可以通过这个FastThreadLocal对象获取一个Map。
 
所以每个线程都会有一个Map,这个Map的key是Stack,这个Map的value是WeakOrderQueue。每个线程的Map表示的是对这个线程来说,不同的Stack对应着其他线程绑定的Stack,而由其他线程创建的对象会存放在其他线程绑定的Stack的WeakOrderQueue里。
 
使用Map结构是为了防止DELAYED_RECYCLED内存膨胀,使用Map结构也可以在当前线程中帮忙快速定位其他线程应该对应于当前线程Stack的哪个WeakOrderQueue。从"delayedRecycled.size() >= maxDelayedQueues"可以看出每个线程最多帮助2倍CPU核数的线程回收对象。
 
说明二:
在pushLater(item, currentThread)中,假设currentThread是线程2,里面的this是线程1的Stack。
 
那么"queue = delayedRecycled.get(this)"会去拿线程1对应的WeakOrderQueue。拿到后若"queue == null",那么就说明线程2从来没有回收过线程1的对象。
 
"queue = newWeakOrderQueue(thread)"会创建一个WeakOrderQueue,也就是去为线程2(thread)去分配一个线程1的Stack对应的WeakOrderQueue,然后通过delayedRecycled.put(this, queue)绑定线程1。
 
说明三:
WeakOrderQueue中有一个Link链表。一个Link的大小是16,每个元素都是一个DefaultHandle。为什么一个Link对应16个DefaultHandle,而不是一个Link对应一个DefaultHandle。
 
如果一个Link对应一个DefaultHandle,那么每次同线程去回收异线程都要创建一个Link对象。比如线程2去回收线程1创建的对象,每次都需要创建一个Link对象然后添加到Link链表中,并且每次都需要判断当前线程1是否还允许继续分配一个Link。
 
把多个DefaultHandle放在一个Link中的好处是不必每次都要判断当前线程2能否回收线程1的对象,只需要判断当前Link是否满了即可,这也体现了Netty进行优化处理的一个思路,也就是通过批量的方式减少某些操作的频繁执行。
 
说明四:
创建WeakOrderQueue对象时,也就是执行代码"new WeakOrderQueue(stack, thread)",使用的是头插法往当前线程的Stack添加WeakOrderQueue对象。
 
说明五:
WeakOrderQueue的add()方法添加对象到WeakOrderQueue时,首先会将tail结点的Link填满,然后再新创建一个Link插入到tail结点。其中LINK_CAPACITY表示Link的DefaultHandle数组的最大长度,默认是16。
 
12.异线程收割对象
说明一:
线程的Stack里有3指针:head、pre、cursor。往Stack中插入一个WeakOrderQueue都是往头部插入的(头插法)。head指向第一个WeakOrderQueue,cursor指向当前回收对象的WeakOrderQueue。
 
说明二:
scavenge()方法会从其他线程回收的对象中尝试转移对象。如果成功则返回,否则设置cursor为head,以便下次从头开始获取。
 
说明三:
scavengeSome()方法会首先判断cursor是否为null。如果cursor为null,则设置cursor为head结点。接着执行do while循环不断尝试从其他线程的WeakOrderQueue中,转移一些对象到当前线程的Stack的一个DefaultHandle数组中。
 
说明四:
cursor.transfer()方法会把WeakOrderQueue里的对象传输到当前线程的Stack的elements数组里。如果传输成功就结束do while循环,如果传输失败就获取cursor的下一个结点cursor.next继续处理。
 
说明五:
cursor.transfer()方法每次都只取WeakOrderQueue里的一个Link,然后传输Link里的elements数组元素到目标Stack的elements数组中。如果cursor = null或success = true,则do while循环结束。所有的WeakOrderQueue默认时最多总共可以有:2K / 16 = 128个Link。
  1. public abstract class Recycler<T> {
  2.     ...
  3.     private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024;//Use 4k instances as default.
  4.     private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
  5.     private static final int MAX_SHARED_CAPACITY_FACTOR;
  6.     private static final int MAX_DELAYED_QUEUES_PER_THREAD;
  7.     private static final int LINK_CAPACITY;
  8.     private static final int RATIO;
  9.    
  10.     static {
  11.         int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
  12.             SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
  13.         if (maxCapacityPerThread < 0) {
  14.             maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
  15.         }
  16.         DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
  17.         MAX_SHARED_CAPACITY_FACTOR = max(2, SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor", 2));
  18.         MAX_DELAYED_QUEUES_PER_THREAD = max(0, SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread", NettyRuntime.availableProcessors() * 2));
  19.         LINK_CAPACITY = safeFindNextPositivePowerOfTwo(max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
  20.         RATIO = safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
  21.         ...
  22.     }
  23.    
  24.     private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED = new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
  25.         @Override
  26.         protected Map<Stack<?>, WeakOrderQueue> initialValue() {
  27.             return new WeakHashMap<Stack<?>, WeakOrderQueue>();
  28.         }
  29.     };
  30.    
  31.     private final int maxCapacityPerThread;
  32.     private final int maxSharedCapacityFactor;
  33.     private final int interval;
  34.     private final int maxDelayedQueuesPerThread;
  35.     private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
  36.         @Override
  37.         protected Stack<T> initialValue() {
  38.             return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, interval, maxDelayedQueuesPerThread);
  39.         }
  40.         @Override
  41.         protected void onRemoval(Stack<T> value) {
  42.             //Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
  43.             if (value.threadRef.get() == Thread.currentThread()) {
  44.                 if (DELAYED_RECYCLED.isSet()) {
  45.                     DELAYED_RECYCLED.get().remove(value);
  46.                 }
  47.             }
  48.         }
  49.     };
  50.     protected Recycler() {
  51.         this(DEFAULT_MAX_CAPACITY_PER_THREAD);
  52.     }
  53.     protected Recycler(int maxCapacityPerThread) {
  54.         this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR);
  55.     }
  56.     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
  57.         this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);
  58.     }
  59.     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor, int ratio, int maxDelayedQueuesPerThread) {
  60.         interval = safeFindNextPositivePowerOfTwo(ratio);
  61.         if (maxCapacityPerThread <= 0) {
  62.             this.maxCapacityPerThread = 0;
  63.             this.maxSharedCapacityFactor = 1;
  64.             this.maxDelayedQueuesPerThread = 0;
  65.         } else {
  66.             this.maxCapacityPerThread = maxCapacityPerThread;
  67.             this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
  68.             this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
  69.         }
  70.     }
  71.     @SuppressWarnings("unchecked")
  72.     public final T get() {
  73.         if (maxCapacityPerThread == 0) {
  74.             return newObject((Handle<T>) NOOP_HANDLE);
  75.         }
  76.         Stack<T> stack = threadLocal.get();
  77.         DefaultHandle<T> handle = stack.pop();
  78.         if (handle == null) {
  79.             //创建一个DefaultHandle
  80.             handle = stack.newHandle();
  81.             //创建一个对象,并绑定这个DefaultHandle
  82.             handle.value = newObject(handle);
  83.         }
  84.         return (T) handle.value;
  85.     }
  86.     ...
  87. }
复制代码
 
13.Recycler的总结
(1)获取对象和回收对象的思路总结
(2)获取对象的具体步骤总结
(3)对象池的设计核心
 
(1)获取对象和回收对象的思路总结
对象池有两个重要的组成部分:Stack和WeakOrderQueue。
 
从Recycler获取对象时,优先从Stack中查找可用对象。如果Stack中没有可用对象,会尝试从WeakOrderQueue迁移一些对象到Stack中。
 
Recycler回收对象时,分为同线程对象回收和异线程对象回收这两种情况。同线程回收直接向Stack添加对象,异线程回收会向WeakOrderQueue中的最后一个Link添加对象。
 
同线程回收和异线程回收都会控制回收速率,默认每8个对象会回收一个,其他的全部丢弃。
 
(2)获取对象的具体步骤总结
如何从一个对象池里获取对象:
 
步骤一:首先通过FastThreadLocal方式拿到当前线程的Stack。
 
步骤二:如果这个Stack里的elements数组有对象,则直接弹出。如果这个Stack里的elements数组没有对象,则从当前Stack关联的其他线程的WeakOrderQueue里的Link结点的elements数组中转移对象,到当前Stack里的elements数组里。
 
步骤三:如果转移成功,那么当前Stack里的elements数组就有对象了,这时就可以直接弹出。如果转移失败,那么接下来就直接创建一个对象然后和当前Stack进行关联。
 
步骤四:关联之后,后续如果是当前线程自己进行对象回收,则将该对象直接存放到当前线程的Stack里。如果是其他线程进行对象回收,则将该对象存放到其他线程与当前线程的Stack关联的WeakOrderQueue里。
 
(3)对象池的设计核心
为什么要分同线程和异线程进行处理,并设计一套比较复杂的数据结构?因为对象池的使用场景一般是高并发的环境,希望通过对象池来减少对象的频繁创建带来的性能损耗。所以在高并发的环境下,从对象池中获取对象和回收对象就只能通过以空间来换时间的思路进行处理,而ThreadLocal恰好是通过以空间换时间的思路来实现的,因此引入了FastThreadLocal来管理对象池里的对象。但是如果仅仅使用FastThreadLocal管理同线程创建和回收的对象,那么并不能充分体现对象池的作用。所以通过FastThreadLocal获取的Stack对象,应该不仅可以管理同线程的对象,也可以管理异线程的对象。为此,Recycler便分同线程和异线程进行处理并设计了一套比较复杂的数据结构。
 
14.Netty设计模式之单例模式
(1)单例模式的特点
(2)单例模式的例子
(3)Netty中的单例模式
 
(1)单例模式的特点
一.一个类全局只有一个对象
二.延迟创建
三.避免线程安全问题
 
(2)单例模式的例子
  1. public abstract class Recycler<T> {
  2.     ...
  3.     private static final class Stack<T> {
  4.         //所属的Recycler
  5.         final Recycler<T> parent;
  6.         //所属线程的弱引用
  7.         final WeakReference<Thread> threadRef;
  8.         //异线程回收对象时,其他线程能保存的被回收对象的最大个数
  9.         final AtomicInteger availableSharedCapacity;
  10.         //WeakOrderQueue最大个数
  11.         private final int maxDelayedQueues;
  12.         //对象池的最大大小,默认最大为4K
  13.         private final int maxCapacity;
  14.         //存储缓存数据的数组
  15.         DefaultHandle<?>[] elements;
  16.         //缓存的DefaultHandle对象个数
  17.         int size;
  18.         //WeakOrderQueue链表的三个重要结点
  19.         private WeakOrderQueue cursor, prev;
  20.         private volatile WeakOrderQueue head;
  21.         Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, int interval, int maxDelayedQueues) {
  22.             this.parent = parent;
  23.             threadRef = new WeakReference<Thread>(thread);
  24.             this.maxCapacity = maxCapacity;
  25.             availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
  26.             elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
  27.             this.interval = interval;
  28.             handleRecycleCount = interval;//Start at interval so the first one will be recycled.
  29.             this.maxDelayedQueues = maxDelayedQueues;
  30.         }
  31.         ...
  32.     }
  33.     ...
  34. }
复制代码
(3)Netty中的单例模式
Netty中的单例模式大都使用饿汉模式,比如ReadTimeoutException、MqttEncoder。
  1. public abstract class Recycler<T> {
  2.     private static final int LINK_CAPACITY;
  3.     static {
  4.         ...
  5.         LINK_CAPACITY = safeFindNextPositivePowerOfTwo(max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
  6.         ...
  7.     }
  8.     ...
  9.     //a queue that makes only moderate guarantees about visibility: items are seen in the correct order,
  10.     //but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain
  11.     private static final class WeakOrderQueue extends WeakReference<Thread> {
  12.         ...
  13.         static final class Link extends AtomicInteger {
  14.             final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];
  15.             int readIndex;
  16.             Link next;
  17.         }
  18.         private static final class Head {
  19.             private final AtomicInteger availableSharedCapacity;
  20.             Link link;
  21.             ...
  22.         }
  23.         ...
  24.     }
  25.     ...
  26. }
复制代码
 
15.Netty设计模式之策略模式
(1)策略模式的特点
(2)策略模式的例子
(3)Netty中的策略模式
 
(1)策略模式的特点
一.封装一系列可相互替换的算法家族
二.动态选择某一个策略
 
(2)策略模式的例子
  1. public abstract class Recycler<T> {
  2.     ...
  3.     private static final class DefaultHandle<T> implements Handle<T> {
  4.         int lastRecycledId;
  5.         int recycleId;
  6.         boolean hasBeenRecycled;
  7.         Stack<?> stack;
  8.         Object value;
  9.         DefaultHandle(Stack<?> stack) {
  10.             this.stack = stack;
  11.         }
  12.         @Override
  13.         public void recycle(Object object) {
  14.             if (object != value) {
  15.                 throw new IllegalArgumentException("object does not belong to handle");
  16.             }
  17.             Stack<?> stack = this.stack;
  18.             if (lastRecycledId != recycleId || stack == null) {
  19.                 throw new IllegalStateException("recycled already");
  20.             }
  21.             stack.push(this);
  22.         }
  23.     }
  24.     ...
  25. }
复制代码
(3)Netty中的策略模式
Netty的DefaultEventExecutorChooserFactory中的newChooser()方法就可以动态选择某具体策略;EventExecutorChooser接口中的next()方法就有两种实现算法:PowerOfTwoEventExecutorChooser.next()和GenericEventExecutorChooser.next()。
  1. public abstract class Recycler<T> {
  2.     ...
  3.     private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
  4.         //在Recycler中调用threadLocal.get()时,便会触发调用这个initialValue()方法
  5.         @Override
  6.         protected Stack<T> initialValue() {
  7.             return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, interval, maxDelayedQueuesPerThread);
  8.         }
  9.         @Override
  10.         protected void onRemoval(Stack<T> value) {
  11.             //Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
  12.             if (value.threadRef.get() == Thread.currentThread()) {
  13.                 if (DELAYED_RECYCLED.isSet()) {
  14.                     DELAYED_RECYCLED.get().remove(value);
  15.                 }
  16.             }
  17.         }
  18.     };
  19.    
  20.     private final int interval;
  21.     private final int maxCapacityPerThread;
  22.     private final int maxSharedCapacityFactor;
  23.     private final int maxDelayedQueuesPerThread;
  24.    
  25.     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor, int ratio, int maxDelayedQueuesPerThread) {
  26.         //默认是8,用于控制对象的回收比率
  27.         interval = safeFindNextPositivePowerOfTwo(ratio);
  28.         if (maxCapacityPerThread <= 0) {
  29.             this.maxCapacityPerThread = 0;
  30.             this.maxSharedCapacityFactor = 1;
  31.             this.maxDelayedQueuesPerThread = 0;
  32.         } else {
  33.             //对象池的最大大小,能存多少元素,默认4K
  34.             this.maxCapacityPerThread = maxCapacityPerThread;
  35.             //默认是2
  36.             this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
  37.             //默认2倍CPU核数
  38.             this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
  39.         }
  40.     }
  41.    
  42.     private static final class Stack<T> {
  43.         ...
  44.         //异线程回收对象时,其他线程能保存的被回收对象的最大个数
  45.         final AtomicInteger availableSharedCapacity;
  46.       
  47.         Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, int interval, int maxDelayedQueues) {
  48.             ...
  49.             availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
  50.             ...
  51.         }
  52.         ...
  53.     }
  54.     ...
  55. }
复制代码
 
16.Netty设计模式之装饰器模式
(1)装饰器模式的特点
(2)装饰器模式的例子
(3)Netty中的装饰器模式
 
(1)装饰器模式的特点
一.装饰者和被装饰者继承自同一个接口
二.装饰者给被装饰者动态修改行为(丰富类的功能)
 
(2)装饰器模式的例子
  1. public abstract class Recycler<T> {
  2.     ...
  3.     public final T get() {
  4.         ...
  5.         //获取当前线程缓存的Stack
  6.         Stack<T> stack = threadLocal.get();
  7.         //从Stack中弹出一个DefaultHandle对象
  8.         DefaultHandle<T> handle = stack.pop();
  9.         if (handle == null) {
  10.             //创建一个DefaultHandle
  11.             handle = stack.newHandle();
  12.             //创建一个对象并保存到DefaultHandle
  13.             handle.value = newObject(handle);
  14.         }
  15.         return (T) handle.value;
  16.     }
  17.    
  18.     //由Recycler的子类来实现创建对象
  19.     protected abstract T newObject(Handle<T> handle);
  20.    
  21.     private static final class Stack<T> {
  22.         ...
  23.         DefaultHandle<T> pop() {
  24.             int size = this.size;
  25.             if (size == 0) {
  26.                 //尝试从其他线程回收的对象中转移一些到Stack的DefaultHandle数组中
  27.                 if (!scavenge()) {
  28.                     return null;
  29.                 }
  30.                 size = this.size;
  31.                 if (size <= 0) {
  32.                     return null;
  33.                 }
  34.             }
  35.             size --;
  36.             //将对象实例从DefaultHandle数组(elements)的栈顶弹出
  37.             DefaultHandle ret = elements[size];
  38.             elements[size] = null;
  39.             //As we already set the element[size] to null we also need to store the updated size before we do any validation.
  40.             //Otherwise we may see a null value when later try to pop again without a new element added before.
  41.             this.size = size;
  42.    
  43.             if (ret.lastRecycledId != ret.recycleId) {
  44.                 throw new IllegalStateException("recycled multiple times");
  45.             }
  46.             ret.recycleId = 0;
  47.             ret.lastRecycledId = 0;
  48.             return ret;
  49.         }
  50.         
  51.         DefaultHandle<T> newHandle() {
  52.             return new DefaultHandle<T>(this);
  53.         }
  54.         ...
  55.     }
  56.     ...
  57. }
复制代码
(3)Netty中的装饰器模式
Netty中的SimpleLeakAwareByteBuf、UnreleasableByteBuf、WrappedByteBuf便用到了装饰器模式。每次调用WrappedByteBuf的方法,都会委托到被装饰的ByteBuf。这个WrappedByteBuf其实是Netty里装饰ByteBuf的一个基类,它基本是直接使用了ByteBuf的方法。
 
WrappedByteBuf有两个子类:SimpleLeakAwareByteBuf和UnreleasableByteBuf。这两个子类都是装饰者,被装饰者都是ByteBuf,在它们的构造函数中传入。
  1. public abstract class Recycler<T> {
  2.     ...
  3.     public final T get() {
  4.         ...
  5.         //获取当前线程缓存的Stack
  6.         Stack<T> stack = threadLocal.get();
  7.         //从Stack中弹出一个DefaultHandle对象
  8.         DefaultHandle<T> handle = stack.pop();
  9.         if (handle == null) {
  10.             //创建一个DefaultHandle
  11.             handle = stack.newHandle();
  12.             //创建一个对象并保存到DefaultHandle
  13.             handle.value = newObject(handle);
  14.         }
  15.         return (T) handle.value;
  16.     }
  17.    
  18.     //从Recycler的get()方法可知:
  19.     //一个对象在创建时就会和一个新创建的DefaultHandle绑定
  20.     //而该DefaultHandle又会和创建该对象的线程绑定,因为DefaultHandle会和Stack绑定,而Stack又会绑定创建该对象的线程
  21.     private static final class DefaultHandle<T> implements Handle<T> {
  22.         ...
  23.         Stack<?> stack;
  24.         Object value;
  25.         DefaultHandle(Stack<?> stack) {
  26.             this.stack = stack;
  27.         }
  28.         @Override
  29.         public void recycle(Object object) {
  30.             ...
  31.             Stack<?> stack = this.stack;
  32.             stack.push(this);
  33.         }
  34.     }
  35.    
  36.     private static final class Stack<T> {
  37.         //一个线程调用Recycler.get()方法创建一个对象时,会先获取和该线程绑定的Stack
  38.         //然后由该Stack创建一个DefaultHandle,接着再创建一个对象并将对象保存到这个DefaultHandle中
  39.         final WeakReference<Thread> threadRef;
  40.         ...
  41.         //当一个对象调用它绑定的DefaultHandle进行回收时,可以通过threadRef取到创建它的线程
  42.         void push(DefaultHandle<?> item) {
  43.             Thread currentThread = Thread.currentThread();
  44.             if (threadRef.get() == currentThread) {
  45.                 //The current Thread is the thread that belongs to the Stack, we can try to push the object now.
  46.                 pushNow(item);
  47.             } else {
  48.                 //The current Thread is not the one that belongs to the Stack (or the Thread that belonged to the Stack was collected already),
  49.                 //we need to signal that the push happens later.
  50.                 pushLater(item, currentThread);
  51.             }
  52.         }
  53.      
  54.         private void pushNow(DefaultHandle<?> item) {
  55.             //防止被多次回收
  56.             if ((item.recycleId | item.lastRecycledId) != 0) {
  57.                 throw new IllegalStateException("recycled already");
  58.             }
  59.             item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
  60.         
  61.             int size = this.size;
  62.             //通过dropHandle(item)来实现:回收每8个对象中的一个
  63.             if (size >= maxCapacity || dropHandle(item)) {
  64.                 //超出最大容量或被回收速率控制则不回收
  65.                 return;
  66.             }
  67.             if (size == elements.length) {
  68.                 //扩容
  69.                 elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
  70.             }
  71.             elements[size] = item;
  72.             this.size = size + 1;
  73.         }
  74.       
  75.         //回收每8个对象中的一个
  76.         boolean dropHandle(DefaultHandle<?> handle) {
  77.             if (!handle.hasBeenRecycled) {
  78.                 //interval默认是8
  79.                 if (handleRecycleCount < interval) {
  80.                     handleRecycleCount++;
  81.                     //Drop the object.
  82.                     return true;
  83.                 }
  84.                 handleRecycleCount = 0;
  85.                 handle.hasBeenRecycled = true;
  86.             }
  87.             return false;
  88.         }
  89.         private void pushLater(DefaultHandle<?> item, Thread thread) {
  90.             ...
  91.             //取出由当前线程帮助其他线程回收对象的缓存
  92.             Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
  93.             //取出对象绑定的Stack对应的WeakOrderQueue
  94.             WeakOrderQueue queue = delayedRecycled.get(this);
  95.             if (queue == null) {
  96.                 //最多帮助2 * CPU核数的线程回收对象
  97.                 if (delayedRecycled.size() >= maxDelayedQueues) {
  98.                     //WeakOrderQueue.DUMMY就是new一个WeakOrderQueue,表示无法再帮该Stack回收对象
  99.                     delayedRecycled.put(this, WeakOrderQueue.DUMMY);
  100.                     return;
  101.                 }
  102.                 //新建WeakOrderQueue
  103.                 if ((queue = newWeakOrderQueue(thread)) == null) {
  104.                     //drop object
  105.                     return;
  106.                 }
  107.                 delayedRecycled.put(this, queue);
  108.             } else if (queue == WeakOrderQueue.DUMMY) {
  109.                 //drop object
  110.                 return;
  111.             }
  112.             //添加对象到WeakOrderQueue的Link链表中
  113.             queue.add(item);
  114.         }
  115.      
  116.         //Allocate a new WeakOrderQueue or return null if not possible.
  117.         private WeakOrderQueue newWeakOrderQueue(Thread thread) {
  118.             return WeakOrderQueue.newQueue(this, thread);
  119.         }
  120.         ...
  121.     }
  122.    
  123.     //每个线程都有一个Map<Stack<?>, WeakOrderQueue>,存放了其他线程及其对应WeakOrderQueue的映射
  124.     private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
  125.         new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
  126.             @Override
  127.             protected Map<Stack<?>, WeakOrderQueue> initialValue() {
  128.                 return new WeakHashMap<Stack<?>, WeakOrderQueue>();
  129.             }
  130.         };
  131.     ...
  132.    
  133.     private static final class WeakOrderQueue extends WeakReference<Thread> {
  134.         ...
  135.         static final class Link extends AtomicInteger {
  136.             //一个大小为16的DefaultHandle数组
  137.             final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];
  138.             int readIndex;
  139.             Link next;
  140.         }
  141.      
  142.         static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
  143.             //We allocated a Link so reserve the space
  144.             if (!Head.reserveSpaceForLink(stack.availableSharedCapacity)) {
  145.                 return null;
  146.             }
  147.             final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
  148.             stack.setHead(queue);
  149.             return queue;
  150.         }
  151.       
  152.         private WeakOrderQueue(Stack<?> stack, Thread thread) {
  153.             super(thread);
  154.             tail = new Link();
  155.             head = new Head(stack.availableSharedCapacity);
  156.             head.link = tail;
  157.             interval = stack.interval;
  158.             handleRecycleCount = interval;
  159.         }
  160.      
  161.         void add(DefaultHandle<?> handle) {
  162.             handle.lastRecycledId = id;
  163.             if (handleRecycleCount < interval) {
  164.                 handleRecycleCount++;
  165.                 return;
  166.             }
  167.             handleRecycleCount = 0;
  168.             Link tail = this.tail;
  169.             int writeIndex;
  170.             if ((writeIndex = tail.get()) == LINK_CAPACITY) {
  171.                 Link link = head.newLink();
  172.                 if (link == null) {
  173.                     return;
  174.                 }
  175.                 this.tail = tail = tail.next = link;
  176.                 writeIndex = tail.get();
  177.             }
  178.             tail.elements[writeIndex] = handle;
  179.             handle.stack = null;
  180.             tail.lazySet(writeIndex + 1);
  181.         }
  182.         ...
  183.     }
  184. }
复制代码
 
17.Netty设计模式之观察者模式
(1)观察者模式的特点
(2)观察者模式的例子
(3)Netty中的观察者模式
 
(1)观察者模式的特点
一.观察者和被观察者
二.观察者订阅消息,被观察者发布消息
三.订阅则能收到消息,取消订阅则不能收到消息
 
(2)观察者模式的例子
女神是被观察者,男孩、男人、老男人是观察者。第一步需要注册观察者到被观察者的一个列表中,第二步当被观察者触发某个动作后需遍历观察者列表执行观察者的方法。
  1. public abstract class Recycler<T> {
  2.     ...
  3.     private static final class Stack<T> {
  4.         private WeakOrderQueue cursor, prev;
  5.         private volatile WeakOrderQueue head;
  6.         ...
  7.         //尝试从其他线程回收的对象中转移一些到elements数组
  8.         private boolean scavenge() {
  9.             if (scavengeSome()) {
  10.                 return true;
  11.             }
  12.             prev = null;
  13.             cursor = head;
  14.             return false;
  15.         }
  16.         private boolean scavengeSome() {
  17.             WeakOrderQueue prev;
  18.             WeakOrderQueue cursor = this.cursor;
  19.             //首先判断cursor是否为null,如果cursor为null,则设置cursor为head结点
  20.             if (cursor == null) {
  21.                 prev = null;
  22.                 cursor = head;
  23.                 if (cursor == null) {
  24.                     return false;
  25.                 }
  26.             } else {
  27.                 prev = this.prev;
  28.             }
  29.             boolean success = false;
  30.             do {
  31.                 //从其他线程的WeakOrderQueue也就是cursor中,
  32.                 //转移一些对象到当前线程的Stack<T>的一个DefaultHandle数组中
  33.                 if (cursor.transfer(this)) {
  34.                     success = true;
  35.                     break;
  36.                 }
  37.                 WeakOrderQueue next = cursor.getNext();
  38.                 ...
  39.                 cursor = next;
  40.             } while (cursor != null && !success);
  41.             this.prev = prev;
  42.             this.cursor = cursor;
  43.             return success;
  44.         }
  45.     }
  46.    
  47.     private static final class WeakOrderQueue extends WeakReference<Thread> {
  48.         ...
  49.         //从当前的WeakOrderQueue中,转移一些对象到目标Stack<T>的一个DefaultHandle数组中
  50.         boolean transfer(Stack<?> dst) {
  51.             Link head = this.head.link;
  52.             if (head == null) {
  53.                 return false;
  54.             }
  55.             if (head.readIndex == LINK_CAPACITY) {
  56.                 if (head.next == null) {
  57.                     return false;
  58.                 }
  59.                 head = head.next;
  60.                 this.head.relink(head);
  61.             }
  62.             final int srcStart = head.readIndex;
  63.             int srcEnd = head.get();
  64.             final int srcSize = srcEnd - srcStart;
  65.             if (srcSize == 0) {
  66.                 return false;
  67.             }
  68.             final int dstSize = dst.size;
  69.             final int expectedCapacity = dstSize + srcSize;
  70.             if (expectedCapacity > dst.elements.length) {
  71.                 final int actualCapacity = dst.increaseCapacity(expectedCapacity);
  72.                 srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
  73.             }
  74.             if (srcStart != srcEnd) {
  75.                 final DefaultHandle[] srcElems = head.elements;
  76.                 final DefaultHandle[] dstElems = dst.elements;
  77.                 int newDstSize = dstSize;
  78.                 for (int i = srcStart; i < srcEnd; i++) {
  79.                     DefaultHandle<?> element = srcElems[i];
  80.                     if (element.recycleId == 0) {
  81.                         element.recycleId = element.lastRecycledId;
  82.                     } else if (element.recycleId != element.lastRecycledId) {
  83.                         throw new IllegalStateException("recycled already");
  84.                     }
  85.                     srcElems[i] = null;
  86.                     if (dst.dropHandle(element)) {
  87.                         //Drop the object.
  88.                         continue;
  89.                     }
  90.                     element.stack = dst;
  91.                     dstElems[newDstSize ++] = element;
  92.                 }
  93.                 if (srcEnd == LINK_CAPACITY && head.next != null) {
  94.                     //Add capacity back as the Link is GCed.
  95.                     this.head.relink(head.next);
  96.                 }
  97.                 head.readIndex = srcEnd;
  98.                 if (dst.size == newDstSize) {
  99.                     return false;
  100.                 }
  101.                 dst.size = newDstSize;
  102.                 return true;
  103.             } else {
  104.                 //The destination stack is full already.
  105.                 return false;
  106.             }
  107.         }
  108.         ...
  109.     }
  110.     ...
  111. }
复制代码
(3)Netty中的观察者模式
Netty的writeAndFlush()方法就是典型的观察者模式。Netty的Future或Promise模式实现了writeAndFlush()的异步化,并且每次写成功或者写失败都能收到回调。
 
我们在调用writeAndFlush()方法后,Netty会创建一个被观察者ChannelFuture。然后在调用channelFuture.addListener()方法时,其实就是往被观察者ChannelFuture中添加一系列的观察者。
  1. public class Singleton {
  2.     private volatile static Singleton singleton;
  3.     private Singleton() {
  4.    
  5.     }
  6.     public static Singleton getInstance() {
  7.         if (singleton == null) {
  8.             synchronized(Singleton.class) {
  9.                 if (singleton == null) {
  10.                     singleton = new Singleton();
  11.                 }
  12.             }
  13.         }
  14.         return singleton;
  15.     }
  16. }
复制代码
每一个writeAndFlush()方法被调用时都是从pipeline开始往前传播,也就是从tail结点开始执行writeAndFlush()方法并从后往前传播。tail结点的writeAndFlush()方法会去new一个Promise(),这个new Promise()就是创建一个被观察者DefaultChannelPromise。DefaultChannelPromise继承自ChannelPromise,ChannelPromise继承自ChannelFuture。
  1. public final class ReadTimeoutException extends TimeoutException {
  2.     public static final ReadTimeoutException INSTANCE =
  3.         PlatformDependent.javaVersion() >= 7 ? new ReadTimeoutException(true) : new ReadTimeoutException();
  4.     ReadTimeoutException() {
  5.     }
  6.    
  7.     private ReadTimeoutException(boolean shared) {
  8.         super(shared);
  9.     }
  10. }
  11. @ChannelHandler.Sharable
  12. public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
  13.     public static final MqttEncoder INSTANCE = new MqttEncoder();
  14.     private MqttEncoder() {
  15.                
  16.     }
  17.     ...
  18. }
复制代码
TailContext父类AbstractChannelHandlerContext的writeAndFlush()方法执行源码:
  1. public class Strategy {
  2.     private Cache cacheMemory = new CacheMemoryImpl();
  3.     private Cache cacheRedis = new CacheRedisImpl();
  4.    
  5.     public interface Cache {
  6.         boolean add(String key, Object object);
  7.     }
  8.    
  9.     public class CacheMemoryImpl implements Cache {
  10.         public boolean add(String key, Object object) {
  11.             //保存到Memory
  12.             return false;
  13.         }
  14.     }
  15.    
  16.     public class CacheRedisImpl implements Cache {
  17.         public boolean add(String key, Object object) {
  18.             //保存到Redis
  19.             return false;
  20.         }
  21.     }
  22.    
  23.     public Cache getCache(String key) {
  24.         if (key.length() < 10) {
  25.             return cacheRedis;
  26.         } else {
  27.             return cacheMemory;
  28.         }
  29.     }
  30. }
复制代码
DefaultPromise添加观察者和通知观察者的源码如下。注意:DefaultPromise.listeners是一个Object,第一次添加时listeners = listener,第二次添加时会将新增的和当前的listeners转为一个数组,然后再往数组里添加元素。
[code]public class DefaultPromise extends AbstractFuture implements Promise {    private Object listeners;    ...    //添加观察者    @Override    public Promise addListener(GenericFutureListener
您需要登录后才可以回帖 登录 | 立即注册