找回密码
 立即注册
首页 业界区 业界 Java源码分析系列笔记-5.AQS

Java源码分析系列笔记-5.AQS

晾棋砷 2025-6-23 18:25:18
目录

  • 1. 是什么
  • 2. 如何使用
  • 3. 原理分析

    • 3.1. 构造方法

      • 3.1.1. 由头尾节点和代表锁状态的字段组成
      • 3.1.2. Node是个双向队列节点

    • 3.2. 获取锁的逻辑

      • 3.2.1. 尝试获取锁
      • 3.2.2. 尝试获取锁失败,则加入AQS队列
      • 3.2.3. 阻塞等待,被唤醒后不停得抢占锁

        • 3.2.3.1. 判断是否需要阻塞当前线程
        • 3.2.3.2. 阻塞当前线程
        • 3.2.3.3. 抢占锁过程中发生异常,那么从阻塞队列中移除当前节点

      • 3.2.4. 恢复中断标记

    • 3.3. 释放锁的逻辑

      • 3.3.1. 尝试释放锁
      • 3.3.2. 释放锁成功则唤醒下一个节点的线程,让其继续抢占锁


  • 4. 总结
  • 5. 参考

1. 是什么

队列同步器,用于实现JUC包的其他并发工具类
2. 如何使用

一般我们不直接使用AQS,而是使用JUC中的其他工具类(如CountDownLatch等),这些工具类覆盖了几乎所有的使用场景,只有在这些工具类无法满足我们的需求时,才去用AQS实现自己的并发工具。
实现的一般的套路如下:

  • 定义并发工具类
  • 在并发工具类内部定义一个静态内部类,实现AQS,根据需要重写一些方法
  • 并发工具类中定义对外的方法,具体实现调用内部静态类的方法进行处理
如下代码为我们实现的简单的CountDownLatch
  1. public class BooleanLatch
  2. {
  3.     //定义内部静态类继承AQS
  4.     //重写对state操作的方法
  5.     private static class Sync extends AbstractQueuedSynchronizer
  6.     {
  7.         boolean isSignalled()
  8.         {
  9.             return getState() != 0;
  10.         }
  11.         protected int tryAcquireShared(int ignore)
  12.         {
  13.             return isSignalled() ? 1 : -1;
  14.         }
  15.         protected boolean tryReleaseShared(int ignore)
  16.         {
  17.             setState(1);
  18.             return true;
  19.         }
  20.     }
  21.     //对外暴露的是封装后的方法
  22.     private final Sync sync = new Sync();
  23.     public boolean isSignalled()
  24.     {
  25.         return sync.isSignalled();
  26.     }
  27.     public void signal()
  28.     {
  29.         sync.releaseShared(1);
  30.     }
  31.     public void await() throws InterruptedException
  32.     {
  33.         sync.acquireSharedInterruptibly(1);
  34.     }
  35.     public static void main(String[] args) throws InterruptedException
  36.     {
  37.         BooleanLatch booleanLatch = new BooleanLatch();
  38.         new Thread(()-> {
  39.             try
  40.             {
  41.                 TimeUnit.SECONDS.sleep(60);
  42.                 System.out.println(Thread.currentThread().getName() + "休眠结束");
  43.                 booleanLatch.signal();
  44.             }
  45.             catch (Exception e)
  46.             {
  47.                 e.printStackTrace();
  48.             }
  49.         }, "releaseThread").start();
  50.         booleanLatch.await();
  51.         System.out.println(Thread.currentThread().getName() + "继续运行");
  52.     }
  53. }
复制代码
3. 原理分析

可以先阅读手写AQS.md以便更好的理解AQS源码
3.1. 构造方法

3.1.1. 由头尾节点和代表锁状态的字段组成
  1. public abstract class AbstractQueuedSynchronizer
  2.     extends AbstractOwnableSynchronizer
  3.     implements java.io.Serializable {
  4.    
  5.     protected AbstractQueuedSynchronizer() { }
  6.         //使用双向队列保存抢占锁失败的线程
  7.     private transient volatile Node head;
  8.     private transient volatile Node tail;
  9.         //使用CAS操作volatile state字段,表示锁的状态
  10.         //state > 0表示获取了锁,state = 0表示释放了锁
  11.     private volatile int state;
  12. }
复制代码
3.1.2. Node是个双向队列节点
  1. static final class Node {
  2.         // 表示shared mode
  3.         static final Node SHARED = new Node();
  4.         // 表示exclusive mode
  5.         static final Node EXCLUSIVE = null;
  6.         
  7.         //当前线程被取消
  8.         static final int CANCELLED =  1;
  9.         //下一个节点需要unpark
  10.              static final int SIGNAL    = -1;
  11.         //Node的状态
  12.         volatile int waitStatus;
  13.                 //前一个节点
  14.         volatile Node prev;
  15.                 //后一个节点
  16.         volatile Node next;
  17.                 //该节点关联的线程
  18.         volatile Thread thread;
  19.         //
  20.         Node nextWaiter;
  21.        
  22.                 // 用于在share mode下创建占位符的头节点
  23.         Node() {   
  24.         }
  25.                 // Used by addWaiter
  26.         Node(Thread thread, Node mode) {   
  27.             this.nextWaiter = mode;
  28.             this.thread = thread;
  29.         }
  30.     }
复制代码
结构如下图:
1.png

3.2. 获取锁的逻辑
  1. public final void acquire(int arg) {
  2.         if (!tryAcquire(arg) &&
  3.             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4.             selfInterrupt();
  5.     }
复制代码
如果tryAcquire返回True,那么表示得到了锁并且不用执行后面的逻辑,acquire直接返回,此时当前线程没有入队
3.2.1. 尝试获取锁

让我们看看tryAcquire的实现
  1. protected boolean tryAcquire(int arg) {
  2.         throw new UnsupportedOperationException();
  3.     }
复制代码
what?居然就只是简单的抛出异常?其实这里运用了模板方法的设计模式,由子类决定具体实现,
如果tryAcquire返回False,便是没有获取锁,那么执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))这段逻辑。
3.2.2. 尝试获取锁失败,则加入AQS队列

首先是addWaiter(Node.EXCLUSIVE),把当前线程以EXCLUSIVE构成node加入等待队列
  1. private Node addWaiter(Node mode) {
  2.     //用当前线程、EXCLUSIVE模式构造节点
  3.     Node node = new Node(Thread.currentThread(), mode);
  4.     // Try the fast path of enq; backup to full enq on failure
  5.     Node pred = tail;
  6.     //队列中尾节点不为空(即不是空队列)
  7.     //那么插入mode到队尾
  8.     if (pred != null) {
  9.         //mode(新节点)的prev指针指向尾节点
  10.         node.prev = pred;
  11.         //cas设置mode为新的尾节点
  12.         if (compareAndSetTail(pred, node)) {
  13.             //设置成功则修改旧尾节点的next为mode
  14.             pred.next = node;
  15.             return node;
  16.         }
  17.     }
  18.     //队列为空或者插入到队尾失败
  19.     enq(node);
  20.     return node;
  21. }
复制代码

  • 8-17行:快速尝试入队,如果队列中尾节点不为空(即不是空队列),则尝试把当前线程构造的节点加入队列的尾部
  • 19行:队列为空或者快速尝试入队失败,那么调用enq。enq代码如下:
  1. private Node enq(final Node node) {
  2.     for (;;) {
  3.         Node t = tail;
  4.         //队尾为空,说明队列为空需要初始化
  5.         if (t == null) { // Must initialize
  6.             //设置new Node为队头(这个头是个占位符)
  7.             if (compareAndSetHead(new Node()))
  8.                 tail = head;
  9.         //队列不为空,加入队列尾部
  10.         //下面的逻辑跟addWaiter的快速尝试差不多
  11.         } else {
  12.             node.prev = t;
  13.             if (compareAndSetTail(t, node)) {
  14.                 t.next = node;
  15.                 return t;
  16.             }
  17.         }
  18.     }
  19. }
复制代码

  • 2行:一直尝试,直到入队成功为止
  • 3-10:如果队列为空,那么使用CAS操作设置new Node为队列头节点(这个头节点是个占位符),接着回到2行for循环,继而执行下面的else逻辑
  • 11-16:队列不为空,使用CAS操作把当前节点加入到队列尾部
    CAS操作就是调用的是Unsafe类的方法,如下:
  1. private final boolean compareAndSetTail(Node expect, Node update) {
  2.                 //tail == expect?是的话更新tail为update
  3.     return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
  4. }
复制代码
加入队尾成功后,接着执行acquireQueued方法
3.2.3. 阻塞等待,被唤醒后不停得抢占锁
  1. final boolean acquireQueued(final Node node, int arg) {
  2.     boolean failed = true;
  3.     try {
  4.         boolean interrupted = false;
  5.         for (;;) {
  6.             //node就是addWaiter里加入到队尾的节点,p是node的前一个节点
  7.             final Node p = node.predecessor();
  8.             //逻辑1:
  9.             //p(前置节点)是head(占位符的头节点)的情况下才尝试获取锁。即当前节点前面没有人等待获取锁,换句话说当前节点就是实际的队头或者说等待时间最长的节点
  10.             if (p == head && tryAcquire(arg)) {
  11.                 //尝试获取锁成功后设置占位符头节点为node(当前节点)
  12.                 setHead(node);
  13.                 p.next = null; // help GC
  14.                 failed = false;
  15.                 return interrupted;
  16.             }
  17.             
  18.             //逻辑2:
  19.             //执行到这里说明上面的获取锁的条件不满足或者抢占锁失败
  20.             //那么需要判断是否需要阻塞,需要的话则进行阻塞
  21.             if (shouldParkAfterFailedAcquire(p, node) &&
  22.                 parkAndCheckInterrupt())
  23.                 interrupted = true;
  24.         }
  25.     } finally {
  26.         if (failed)
  27.             //何时执行这段逻辑?发生异常导致获取锁失败的时候
  28.             cancelAcquire(node);
  29.     }
  30. }
复制代码

  • 5行:死循环尝试直到获取锁成功
  • 6-17行:如果刚刚使用addWaiter入队尾的当前节点前面没有其他节点在等待,即我是等待时间最长的节点,那么尝试抢占锁。这里之所以还需要抢占是因为可以有其他线程(不是队列中的线程)同时进来抢占锁
  • 18-23行:抢占锁失败则需要阻塞。阻塞后什么时候被唤醒?当前节点(当前线程)的前序节点释放同步状态时,会唤醒该节点(该线程)(unparkSuccessor)。唤醒之后干什么?继续执行上面的逻辑1
3.2.3.1. 判断是否需要阻塞当前线程


  • shouldParkAfterFailedAcquire
  1. //根据(前一个节点,当前节点)->是否阻塞当前线程
  2. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  3.         int ws = pred.waitStatus;
  4.         //如果当前节点的前一个节点状态为SIGNAL
  5.         //表示它(前一个节点)承诺唤醒当前节点,那么可以放心阻塞,故直接返回ture
  6.         if (ws == Node.SIGNAL)
  7.             return true;
  8.             
  9.         //>0表示状态为取消的节点
  10.         //前置节点已被取消
  11.         if (ws > 0) {
  12.              //一直往前遍历找到没有取消的节点,并且遍历过程中把取消的节点删除
  13.                   do {
  14.                 node.prev = pred = pred.prev;
  15.             } while (pred.waitStatus > 0);
  16.             pred.next = node;
  17.         //前置节点未被取消(即为 0 或者 Node.PROPAGATE)
  18.         } else {
  19.             //CAS设置前置节点状态为SIGNAL,下次进来就是5-6行
  20.             //pred的waitStatus==ws?是的话更新waitStatus为Node.SIGNAL
  21.             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  22.         }
  23.         return false;
  24.     }
复制代码

  • 6-7行:当前节点的前置节点状态为SIGNAL的时候,可以阻塞
3.2.3.2. 阻塞当前线程


  • parkAndCheckInterrupt
  1. private final boolean parkAndCheckInterrupt() {
  2.     //简单的调用LockSupport的park方法阻塞当前线程
  3.     LockSupport.park(this);
  4.     //返回是否被中断的标志
  5.     return Thread.interrupted();
  6. }
复制代码

  • LockSupport park
  1. public static void park(Object blocker) {
  2.     Thread t = Thread.currentThread();
  3.     //用当前AQS实例对象作为blocker,记录当前线程等待的对象(阻塞对象)
  4.     setBlocker(t, blocker);
  5.     //简单调用UNSAFE的park方法阻塞当前线程
  6.     UNSAFE.park(false, 0L);
  7.     setBlocker(t, null);
  8. }
复制代码
3.2.3.3. 抢占锁过程中发生异常,那么从阻塞队列中移除当前节点


  • cancelAcquire
[code]//node是addWaiter加入队尾的节点//这段取消的逻辑要做的就是从队列中删除该节点private void cancelAcquire(Node node) {    // 当前节点为空,那么不需要删除,直接返回    if (node == null)        return;    //1. node不再关联到任何线程    node.thread = null;    //2. 找到前置节点    //跳过被cancel的前继node,往前找到一个有效的前继节点pred    Node pred = node.prev;    while (pred.waitStatus > 0) //>0表示状态为取消的节点        node.prev = pred = pred.prev;//限制性pred=pred.prev;在执行node.prev=pred    //predNext是前置节点的下一个节点,注意这里可能不是当前node    Node predNext = pred.next;    //3. 将node的waitStatus置为CANCELLED    node.waitStatus = Node.CANCELLED;    //4. 从队列中删除该节点    //当前node是尾节点的删除操作:CAS设置队列的尾节点为pred    if (node == tail && compareAndSetTail(node, pred)) {        //CAS设置pred.next为null        compareAndSetNext(pred, predNext, null);        //当前node不是尾节点的删除操作    } else {                int ws;        //5. 下面的判断是指node既不是tail,又不是head的后继节点的情况        //当前节点的前置节点不是队头         //并且                 //要么 前置节点的状态为Node.SIGNAL(即承诺唤醒下一个节点)                //要么 前置节点的状态是正常的且CAS设置前置节点的状态为Node.SIGNAL成功 且                    //前置节点的线程为null        if (pred != head &&            ((ws = pred.waitStatus) == Node.SIGNAL ||             (ws
您需要登录后才可以回帖 登录 | 立即注册