找回密码
 立即注册
首页 业界区 业界 18.Java源码分析系列笔记-JDK1.8的ConcurrentHashMap ...

18.Java源码分析系列笔记-JDK1.8的ConcurrentHashMap

萨瑞饨 2025-9-25 20:59:09
目录

  • 1. 是什么
  • 2. 如何使用
  • 3. 原理分析

    • 3.1. 构造方法

      • 3.1.1. Node

    • 3.2. put方法【有加锁】

      • 3.2.1. 计算key的hash
      • 3.2.2. 死循环
      • 3.2.3. 第一次进来table为空,所以需要初始化table

        • 3.2.3.1. 使用CAS加锁防止多线程同时初始化table
        • 3.2.3.2. 其他线程让出CPU直到扩容完毕

      • 3.2.4. 第二次进来table不为空,链表肯定为空【头节点为空】,那么CAS设置头节点

        • 3.2.4.1. 获取第一个元素
        • 3.2.4.2. CAS设置头节点

      • 3.2.5. 第三次进来若链表不为空【头节点不为空】,那么对头节点加锁,使用链表的操作或树的操作插入

        • 3.2.5.1. 具体的插入操作
        • null

      • 3.2.6. 扩容的操作

    • 3.3. get方法【没有加锁】

      • 3.3.1. 计算key的hash值
      • 3.3.2. 获取第一个元素
      • 3.3.3. 第一个元素就是要找的节点
      • 3.3.4. 第一个元素不是要找的节点且hash=0说明是个链表那么遍历链表找到相等的节点

    • 3.4. remove方法【有加锁】

      • 3.4.1. 死循环
      • 3.4.2. table为空或者链表头节点为空,说明不存在那么返回空
      • 3.4.3. 头节点不为空先加锁,然后使用树或者链表的操作删除节点

        • 3.4.3.1. 链表的删除节点操作--更新指针


    • 3.5. containsKey方法【没有加锁】

  • 4. 参考链接

1. 是什么

线程安全的HashMap,底层使用sychronized+CAS+HashMap的结构(数组+链表+红黑树)实现
2. 如何使用
  1. public class ConcurrentHashMapTest
  2. {
  3.     public static void main(String[] args) throws InterruptedException
  4.     {
  5.         ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<>();
  6.         Thread thread1 = new Thread(()->{
  7.             for (int i = 0; i < 100000; i++)
  8.             {
  9.                 map.put(i, i);
  10.             }
  11.         });
  12.         Thread thread2 = new Thread(()->{
  13.             for (int i = 100000; i < 200000; i++)
  14.             {
  15.                 map.put(i, i);
  16.             }
  17.         });
  18.         thread1.start();
  19.         thread2.start();
  20.         thread1.join();
  21.         thread2.join();
  22.         System.out.println(map);
  23.         System.out.println(map.size());
  24.         for (int i = 0; i < 200000; i++)
  25.         {
  26.             if (!map.contains(i))
  27.             {
  28.                 throw new RuntimeException("并发put有问题");//不会抛出异常说明并发put没问题
  29.             }
  30.             System.out.println(map.remove(i));
  31.         }
  32.     }
  33. }
复制代码
3. 原理分析

3.1. 构造方法
  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
  2.     implements ConcurrentMap<K,V>, Serializable {
  3.     private static final long serialVersionUID = 7249069246763182397L;
  4.     //最大的数组长度。必须是2的次幂
  5.     private static final int MAXIMUM_CAPACITY = 1 << 30;
  6.     //默认的数组长度。必须是2的次幂
  7.     private static final int DEFAULT_CAPACITY = 16;
  8.     //默认的加载因子。
  9.     //当数组中有元素的entry的数量>=数组长度*LOAD_FACTOR时会进行扩容
  10.     private static final float LOAD_FACTOR = 0.75f;
  11.      //当链表(不包括头节点)中元素的数目为8的时候需要转成红黑树
  12.     static final int TREEIFY_THRESHOLD = 8;
  13.     //当红黑树(不包括头节点)中元素的数目为6的时候需要转成链表
  14.     static final int UNTREEIFY_THRESHOLD = 6;
  15.     //数组中entry的数目为64的才转换成红黑树
  16.     static final int MIN_TREEIFY_CAPACITY = 64;        
  17.     //-1表示正在初始化,或者是(-1+正在扩容的线程数)
  18.     //0或正数则代表hash表还未被初始化
  19.     private transient volatile int sizeCtl;
  20.     //使用volatile修饰Node数组,如果这个数组引用(不是内容)改变
  21.     //那么其他线程能立马感知(volatile的可见性)
  22.     //这个应该是扩容的时候修改Node数组会用到
  23.     transient volatile Node<K,V>[] table;
  24.     public ConcurrentHashMap() {
  25.     }
  26. }
复制代码
3.1.1. Node
  1. static class Node<K,V> implements Map.Entry<K,V> {
  2.     //final修饰key和hash表明这些是常量
  3.     //常量是线程安全的
  4.     final int hash;
  5.     final K key;
  6.     //val和next都用volatile修饰(可见性+有序性)
  7.     //配合CAS操作(原子性)就可以保证线程安全
  8.     //这也是get方法不用加锁的原因
  9.     volatile V val;
  10.     volatile Node<K,V> next;
  11.     Node(int hash, K key, V val, Node<K,V> next) {
  12.         this.hash = hash;
  13.         this.key = key;
  14.         this.val = val;
  15.         this.next = next;
  16.     }
  17. }
复制代码
3.2. put方法【有加锁】
  1. public V put(K key, V value) {
  2.     //把key和value传入putVal方法
  3.     return putVal(key, value, false);
  4. }
复制代码

  • putVal
  1. final V putVal(K key, V value, boolean onlyIfAbsent) {
  2.     if (key == null || value == null) throw new NullPointerException();//不允许插入null的key或者value
  3.     // (h ^ (h >>> 16)) & HASH_BITS(0x7fffffff)
  4.     int hash = spread(key.hashCode());
  5.     int binCount = 0;
  6.     //死循环配合cas
  7.     for (Node<K,V>[] tab = table;;) {
  8.         Node<K,V> f; int n, i, fh;
  9.         //如果table为空,第一次初始化table
  10.         if (tab == null || (n = tab.length) == 0)
  11.             tab = initTable();
  12.         //链表头节点为空,那么尝试cas设置头节点
  13.         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  14.             if (casTabAt(tab, i, null,
  15.                          new Node<K,V>(hash, key, value, null)))
  16.                 break;                   // cas设置头节点成功直接break
  17.         }
  18.         //有其他线程正在移动元素
  19.         else if ((fh = f.hash) == MOVED)
  20.             //协助其他线程扩容
  21.             tab = helpTransfer(tab, f);
  22.         //链表头节点不为空,走到这里发生了hash碰撞
  23.         else {
  24.             V oldVal = null;
  25.             //可能竞争很大,所以用synchronized加锁而不是cas
  26.             //相比于JDK7的这里锁的粒度更加小了,锁的粒度缩小为数组中每个链表的头节点
  27.             synchronized (f) {
  28.                 if (tabAt(tab, i) == f) {
  29.                     if (fh >= 0) {//头节点的hash>=0说明是个链表
  30.                         binCount = 1;
  31.                         //遍历链表,并且用bitCount计数链表中节点个数
  32.                         for (Node<K,V> e = f;; ++binCount) {
  33.                             K ek;
  34.                             //找到了相等的节点,那么保存旧val,并更新val,退出循环
  35.                             if (e.hash == hash &&
  36.                                 ((ek = e.key) == key ||
  37.                                  (ek != null && key.equals(ek)))) {
  38.                                 oldVal = e.val;
  39.                                 if (!onlyIfAbsent)
  40.                                     e.val = value;
  41.                                 break;
  42.                             }
  43.                             Node<K,V> pred = e;
  44.                             //到了尾节点,直接插入到末尾
  45.                             if ((e = e.next) == null) {
  46.                                 pred.next = new Node<K,V>(hash, key,
  47.                                                           value, null);
  48.                                 break;
  49.                             }
  50.                         }
  51.                     }
  52.                     //如果是树的节点,那么转调树
  53.                     else if (f instanceof TreeBin) {
  54.                         Node<K,V> p;
  55.                         binCount = 2;
  56.                         if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  57.                                                        value)) != null) {
  58.                             oldVal = p.val;
  59.                             if (!onlyIfAbsent)
  60.                                 p.val = value;
  61.                         }
  62.                     }
  63.                 }
  64.             }
  65.             //判断是否需要树化
  66.             if (binCount != 0) {
  67.                 if (binCount >= TREEIFY_THRESHOLD)
  68.                     treeifyBin(tab, i);
  69.                 if (oldVal != null)
  70.                     return oldVal;
  71.                 break;
  72.             }
  73.         }
  74.     }
  75.     //更新数量
  76.     addCount(1L, binCount);
  77.     return null;
  78. }
复制代码

  • 4行:计算key的hash,这里不是简单得使用key.hashCode方法
  • 7行:死循环直到成功
  • 10-11行:第一次进来table为空,所以需要初始化table
  • 13-17行:第二次进来table不为空,链表肯定为空【头节点为空】,那么设置CAS头节点
  • 22-72行:第三次进来若链表不为空【头节点不为空】,那么对头节点加锁,使用链表的操作或树的操作插入
  • 75行:数量+1并且判断是否需要扩容
3.2.1. 计算key的hash


  • spread
  1. static final int spread(int h) {
  2.     //通过把hashCode的高16位和低16位异或从而让每一位都参与运算减低hash碰撞的概率
  3.     //与HASH_BITS(0x7fffffff)相与保证不会出现负数?
  4.     return (h ^ (h >>> 16)) & HASH_BITS;
  5. }
复制代码
3.2.2. 死循环
  1. for (Node<K,V>[] tab = table;;) {
  2. //....
  3. }
复制代码
3.2.3. 第一次进来table为空,所以需要初始化table
  1. //如果table为空,第一次初始化table
  2. if (tab == null || (n = tab.length) == 0)
  3.     tab = initTable();
复制代码

  • initTable
  1. private final Node<K,V>[] initTable() {
  2.     Node<K,V>[] tab; int sc;
  3.     //这个也是个死循环
  4.     while ((tab = table) == null || tab.length == 0) {
  5.             //sizeCtl<0表示有其他线程正在初始化或者扩容
  6.         if ((sc = sizeCtl) < 0)
  7.                 //让出cpu,让扩容或者初始化的线程执行
  8.             Thread.yield(); // lost initialization race; just spin
  9.         //当前线程尝试修改sizeCtl为-1(表示正在初始化数组),成功后进入扩容逻辑
  10.         else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
  11.             try {
  12.                     //第一次初始化
  13.                 if ((tab = table) == null || tab.length == 0) {
  14.                     //容量为16
  15.                     int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
  16.                     @SuppressWarnings("unchecked")
  17.                     //创建长度为n的Node数组
  18.                     Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  19.                     table = tab = nt;
  20.                     //sizeCtl为8
  21.                     sc = n - (n >>> 2);
  22.                 }
  23.             } finally {
  24.                 sizeCtl = sc;
  25.             }
  26.             break;
  27.         }
  28.     }
  29.     return tab;
  30. }
复制代码
3.2.3.1. 使用CAS加锁防止多线程同时初始化table
  1. //当前线程正在尝试修改sizeCtl,成功后进入扩容逻辑
  2. else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
  3.     try {
  4.             //第一次初始化
  5.         if ((tab = table) == null || tab.length == 0) {
  6.             //容量为16
  7.             int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
  8.             @SuppressWarnings("unchecked")
  9.             //创建长度为n的Node数组
  10.             Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  11.             table = tab = nt;
  12.             //sizeCtl为8
  13.             sc = n - (n >>> 2);
  14.         }
  15.     } finally {
  16.         sizeCtl = sc;
  17.     }
  18.     break;
  19. }
复制代码
3.2.3.2. 其他线程让出CPU直到扩容完毕
  1. while ((tab = table) == null || tab.length == 0) {
  2. //sizeCtl<0表示正在初始化或者扩容
  3. if ((sc = sizeCtl) < 0)
  4.         //让出cpu,让扩容或者初始化的线程执行
  5.     Thread.yield(); // lost initialization race; just spin
  6. }
复制代码
3.3.2. 获取第一个元素

首先通过(n - 1) & hash计算元素位置。n是2的次幂,n-1的话相当于最高位是0其余位都是1,hash与整个数相与结果跟对数组长度取模一样,只不过效率更高。
然后通过UNSAFE类的CAS操作拿到该位置的元素【每个元素都是一个链表的头节点或者红黑树的根节点】
  1. //链表头节点为空,
  2. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  3.     //那么尝试cas设置头节点
  4.     if (casTabAt(tab, i, null,
  5.                  new Node<K,V>(hash, key, value, null)))
  6.         break;                   // no lock when adding to empty bin
  7. }
复制代码
3.4.1. 死循环
  1. static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
  2.     //通过Unsafe类取的
  3.     return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
  4. }
复制代码
3.4.2. table为空或者链表头节点为空,说明不存在那么返回空
  1. static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
  2.                                     Node<K,V> c, Node<K,V> v) {
  3.     //也是通过Unsafe设置的
  4.     return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
  5. }
复制代码
3.4.3. 头节点不为空先加锁,然后使用树或者链表的操作删除节点
  1. //链表头节点不为空
  2. else {
  3.     V oldVal = null;
  4.     //可能竞争很大,所以用synchronized加锁而不是cas
  5.     //相比于JDK7的这里锁的粒度更加小了,锁的粒度缩小为数组中每个链表的头节点
  6.     //JDK7的是对segment(类似于多个链表头节点)加锁
  7.     synchronized (f) {
  8.         //头节点确实没有变化--啥时候会变化?
  9.         if (tabAt(tab, i) == f) {
  10.             if (fh >= 0) {
  11.                 binCount = 1;
  12.                 //遍历链表,并且用bitCount计数链表中节点个数
  13.                 for (Node<K,V> e = f;; ++binCount) {
  14.                     K ek;
  15.                     //找到了相等的节点,那么保存旧val,并更新val,退出循环
  16.                     if (e.hash == hash &&
  17.                         ((ek = e.key) == key ||
  18.                          (ek != null && key.equals(ek)))) {
  19.                         oldVal = e.val;
  20.                         if (!onlyIfAbsent)
  21.                             e.val = value;
  22.                         break;
  23.                     }
  24.                     Node<K,V> pred = e;
  25.                     //到了尾节点,直接插入到末尾,退出循环
  26.                     if ((e = e.next) == null) {
  27.                         pred.next = new Node<K,V>(hash, key,
  28.                                                   value, null);
  29.                         break;
  30.                     }
  31.                 }
  32.             }
  33.             //如果是树的节点,那么转调树
  34.             else if (f instanceof TreeBin) {
  35.                 Node<K,V> p;
  36.                 binCount = 2;
  37.                 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  38.                                                value)) != null) {
  39.                     oldVal = p.val;
  40.                     if (!onlyIfAbsent)
  41.                         p.val = value;
  42.                 }
  43.             }
  44.         }
  45.     }
  46.     //判断是否需要树化
  47.     if (binCount != 0) {
  48.         if (binCount >= TREEIFY_THRESHOLD)
  49.             treeifyBin(tab, i);
  50.         if (oldVal != null)
  51.             return oldVal;
  52.         break;
  53.     }
  54. }
复制代码
3.4.3.1. 链表的删除节点操作--更新指针
  1. private final void addCount(long x, int check) {
  2.     CounterCell[] as; long b, s;
  3.     if ((as = counterCells) != null ||
  4.         !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
  5.         CounterCell a; long v; int m;
  6.         boolean uncontended = true;
  7.         if (as == null || (m = as.length - 1) < 0 ||
  8.             (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
  9.             !(uncontended =
  10.               U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
  11.             fullAddCount(x, uncontended);
  12.             return;
  13.         }
  14.         if (check <= 1)
  15.             return;
  16.         s = sumCount();
  17.     }
  18.     if (check >= 0) {
  19.         Node<K,V>[] tab, nt; int n, sc;
  20.         while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
  21.                (n = tab.length) < MAXIMUM_CAPACITY) {
  22.             int rs = resizeStamp(n);
  23.             if (sc < 0) {
  24.                 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
  25.                     sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
  26.                     transferIndex <= 0)
  27.                     break;
  28.                 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
  29.                     transfer(tab, nt);
  30.             }
  31.             else if (U.compareAndSwapInt(this, SIZECTL, sc,
  32.                                          (rs << RESIZE_STAMP_SHIFT) + 2))
  33.                 transfer(tab, null);
  34.             s = sumCount();
  35.         }
  36.     }
  37. }
复制代码

  • setTabAt
[code]static final  void setTabAt(Node[] tab, int i, Node v) {    U.putObjectVolatile(tab, ((long)i

相关推荐

前天 00:41

举报

这个有用。
您需要登录后才可以回帖 登录 | 立即注册