今天泡杯茶,深入聊聊咱们的老朋友——ReentrantLock。平时用 synchronized 关键字挺顺手,但一旦想玩点高级的,比如公平锁、尝试获取锁、可中断获取锁,那就得请出 ReentrantLock 了。咱们不光要会用,还得掀开它的盖子,看看里面的发动机(AQS)是怎么转的。
为了让咱们的探索更有代入感,我先写一个最简单的使用示例作为我们的“地图”,然后咱们就跟着代码的调用链路,一步步“钻”进源码里去探险。
我们的探索地图:示例代码
- import java.util.concurrent.locks.ReentrantLock;
- public class ReentrantLockDemo {
- // 这就是我们今天要解剖的主角。默认是非公平锁。
- private final ReentrantLock lock = new ReentrantLock();
- public void doSomething() {
- // 第一站:获取锁
- lock.lock();
- try {
- // 临界区代码,同一时间只有一个线程能执行
- System.out.println(Thread.currentThread().getName() + " got the lock.");
- // 为了演示重入,我们调用另一个也需要锁的方法
- doSomethingElse();
- } finally {
- // 最后一站:释放锁
- // 一定要放在finally里,保证即使出异常也能释放锁,避免死锁。
- lock.unlock();
- }
- }
- public void doSomethingElse() {
- lock.lock(); // 同一个线程,再次获取锁 -> 重入
- try {
- System.out.println(Thread.currentThread().getName() + " got the lock again (reentrant).");
- } finally {
- lock.unlock(); // 释放重入的锁
- }
- }
- public static void main(String[] args) {
- final ReentrantLockDemo demo = new ReentrantLockDemo();
- // 创建几个线程来竞争锁
- for (int i = 0; i < 3; i++) {
- new Thread(() -> demo.doSomething(), "Thread-" + i).start();
- }
- }
- }
复制代码 好了,地图在手,天下我有。我们的探险路线非常清晰:lock.lock() -> 临界区 -> lock.unlock()。出发!
第一站:获取锁 - lock.lock()
当我们调用 lock.lock() 时,会发生什么呢?点进去看看:- // ReentrantLock.java
- public void lock() {
- sync.lock(); // 嚯,它直接把活儿委托给了内部类`sync`
- }
复制代码 这个 sync 是何方神圣?它在 ReentrantLock 构造的时候就初始化了:- // ReentrantLock.java
- private final Sync sync;
- public ReentrantLock() {
- sync = new NonfairSync(); // 默认是非公平锁
- }
复制代码 所以,sync.lock() 实际上调用的是 NonfairSync 类的 lock() 方法。咱们就看看非公平锁是怎么“抢”的。
非公平锁的“抢”锁行为 - NonfairSync.lock()
- // ReentrantLock.NonfairSync
- static final class NonfairSync extends Sync {
- final void lock() {
- // 【第一步:不管三七二十一,先直接尝试CAS修改状态,把state从0改成1】
- if (compareAndSetState(0, 1))
- // 如果抢成功了!立马把锁的主人设为自己,然后直接返回,成功获取锁。
- setExclusiveOwnerThread(Thread.currentThread());
- else
- // 如果第一步没抢到,那就调用AQS提供的标准acquire方法。
- acquire(1);
- }
- // ... 后续还有其他方法
- }
复制代码 源码注释:
- compareAndSetState(0, 1): 这是AQS提供的一个CAS操作,它尝试将 state 字段(可以理解为锁的计数器)从0改为1。0代表锁空闲,大于0代表被持有。这是实现锁的基石。
- setExclusiveOwnerThread(Thread.currentThread()): 这也是AQS父类中的方法,就是简单地记录下当前是哪个线程持有了这个独占锁。
思考一下:为什么叫“非公平”?就因为这一步!它完全不看后面有没有线程在排队等待,自己直接上来就抢。这就像你去排队买奶茶,突然有个人插队到最前面直接点单,这就是“非公平”。但如果他抢失败了(CAS返回false),他就得老实地去后面排队(调用 acquire(1))。
如果没抢到,就会调用 acquire(1)。这是AQS的核心方法,是一个模板方法,它定义了获取资源的总体流程,但其中一些关键步骤留给子类自己实现。- // AbstractQueuedSynchronizer.java
- public final void acquire(int arg) {
- // 这是一个非常经典的条件判断流程:
- // 1. 首先再尝试一次获取(tryAcquire,由子类实现)
- // 2. 如果获取失败,则将当前线程包装成节点加入队列(addWaiter)
- // 3. 然后在队列中自旋或阻塞地等待(acquireQueued)
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- // 如果acquireQueued返回true,代表等待过程中线程被中断了,
- // 这里重新设置一下中断标志位(因为阻塞过程中中断状态被清除了)
- selfInterrupt();
- }
复制代码 这个方法就像是一个工作流引擎,我们一步步拆解。
关键点一:再次尝试获取 - tryAcquire(arg)
tryAcquire 在AQS里是抽象的,具体实现看子类,也就是我们的 NonfairSync。- // ReentrantLock.NonfairSync
- protected final boolean tryAcquire(int acquires) {
- // 直接调用了父类Sync实现的一个非公平获取方法
- return nonfairTryAcquire(acquires);
- }
- // ReentrantLock.Sync
- final boolean nonfairTryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState(); // 获取当前锁状态
- if (c == 0) {
- // 【状态为0,锁又空闲了!机会来了,再次尝试CAS抢锁!】
- // 这就是非公平的第二次体现:即使可能在排队,新来的线程依然有机会抢
- if (compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true; // 成功!
- }
- }
- // 【关键点:如何实现可重入?】
- else if (current == getExclusiveOwnerThread()) {
- // 如果state不为0,但持有锁的线程就是当前线程自己
- int nextc = c + acquires; // 那就把state直接加上acquires(通常是1)
- if (nextc < 0) // 溢出检查,int最大值是2147483647,重入次数不能超过这个
- throw new Error("Maximum lock count exceeded");
- setState(nextc); // 设置新的state值。注意这里不需要CAS,因为锁本来就是自己占着的
- return true; // 获取成功,这就是重入!
- }
- // 如果锁被别的线程占着,或者自己CAS又没抢过别人,那就返回false,获取失败。
- return false;
- }
复制代码 可重入锁的实现奥秘就在这里! 它通过检查当前线程是否是锁的持有者来实现。如果是,就把 state 简单地 +1。释放的时候,也需要释放相应的次数(state -1),直到减为0才算真正释放。这就是为什么 lock() 和 unlock() 必须要成对出现的原因。
如果 tryAcquire 返回 false,意味着获取又失败了。工作流引擎就会继续往下走:addWaiter(Node.EXCLUSIVE)。
关键点二:线程入队 - addWaiter(Node mode)
是时候把当前线程放入等待队列了。Node.EXCLUSIVE 代表这是一个独占模式的节点。- // AbstractQueuedSynchronizer.java
- private Node addWaiter(Node mode) {
- // 1. 以给定模式创建当前线程的新节点
- // mode有两种:Node.EXCLUSIVE(独占)或Node.SHARED(共享)
- Node node = new Node(Thread.currentThread(), mode);
- // 快速尝试:直接CAS设置新的尾节点,如果成功就直接返回。
- Node pred = tail; // 获取当前尾节点
- if (pred != null) {
- node.prev = pred; // 新节点的前驱指向当前尾节点
- if (compareAndSetTail(pred, node)) { // CAS操作,把tail指针指向新节点
- pred.next = node; // 将原尾节点的后继指向新节点,完成双向链表连接
- return node;
- }
- }
- // 如果快速尝试失败(比如并发入队导致CAS失败),或者队列还没初始化(pred==null)
- // 就进入一个循环,不断尝试入队,直到成功
- enq(node);
- return node;
- }
- // 循环入队,保证肯定能成功
- private Node enq(final Node node) {
- for (;;) { // 自旋循环
- Node t = tail;
- if (t == null) { // 如果队列是空的,必须初始化
- // CAS地设置一个哑元节点(Dummy Node)作为头节点
- if (compareAndSetHead(new Node()))
- tail = head; // 头尾都指向这个新节点
- } else {
- // 和快速尝试里的逻辑一样,CAS地将新节点设为尾节点
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t; // 返回旧的尾节点
- }
- }
- }
- }
复制代码 源码注释:
- CLH队列:AQS的队列是一个虚拟的CLH队列的变种。它是一个FIFO的双向链表。
- 初始化:队列懒惰初始化。第一个节点入队时,会先创建一个不包含任何线程的“哑元节点”或“哨兵节点”作为头节点。头节点可以认为就是当前持有锁的节点。
- 入队步骤:创建新节点 -> 将新节点的prev指向当前tail -> CAS将tail指向新节点 -> 将原tail的next指向新节点。注意:prev指针是稳定的,而next指针在并发情况下可能暂时不一致,这也是为什么唤醒时有时需要从后往前遍历的原因(我们后面会看到)。
现在,线程已经被成功包装成Node,放入等待队列的尾部了。接下来就是它在队列中的“修炼”了:acquireQueued。
关键点三:队列中的等待与唤醒 - acquireQueued(final Node node, int arg)
这个方法让已经在队列中的节点,以自旋(循环)的方式不断尝试获取锁,如果失败且判断需要休息,就安心阻塞(park),直到被前驱节点唤醒。- // AbstractQueuedSynchronizer.java
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true; // 标记是否最终失败(比如被取消)
- try {
- boolean interrupted = false; // 标记等待过程中是否被中断过
- for (;;) { // 自旋,核心循环
- final Node p = node.predecessor(); // 获取当前节点的前驱节点
- // 【关键判断】:如果前驱节点是头节点,说明自己是队列里的第一个等待者
- // 有资格再去尝试获取一次锁!
- if (p == head && tryAcquire(arg)) {
- setHead(node); // 获取成功!把自己设置为新的头节点
- p.next = null; // 帮助GC,断开旧头节点的链接
- failed = false;
- return interrupted; // 返回中断状态
- }
- // shouldParkAfterFailedAcquire: 检查获取失败后是否应该park阻塞
- // parkAndCheckInterrupt: 如果应该,那就park阻塞,并在被唤醒后检查中断状态
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true; // 如果park过程中被中断,记录中断状态
- }
- } finally {
- if (failed)
- cancelAcquire(node); // 如果最终失败(比如异常),取消当前节点
- }
- }
- // 检查并更新节点的状态,告诉它“你该休息了,等前驱节点叫你”
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus; // 获取前驱节点的等待状态
- if (ws == Node.SIGNAL) // Node.SIGNAL(-1): 表示“后继节点需要被唤醒”
- // 前驱节点状态正确,可以安心park了
- return true;
- if (ws > 0) { // ws>0 只有CANCELLED(1),表示前驱节点已取消
- // 那就一直往前找,找到一个有效(非取消)的节点,并排在它后面
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- } else {
- // ws是0或PROPAGATE(-3),把前驱节点的状态CAS地设为SIGNAL
- // 告诉它“你释放锁的时候记得叫我啊!”
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false; // 这次先不park,下次循环再来检查
- }
- // 阻塞当前线程,并在被唤醒后返回线程的中断状态
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this); // 调用Unsafe的park方法,线程在此处阻塞
- return Thread.interrupted(); // 被唤醒后,清除并返回中断标志
- }
复制代码 源码注释:
- 自旋获取:只要前驱是头节点,就有资格不断尝试 tryAcquire。这减少了不必要的阻塞唤醒开销。
- SIGNAL状态:这是节点间的一种“约定”。一个节点的 waitStatus 为 SIGNAL,意味着它释放锁时有责任唤醒它的后继节点。shouldParkAfterFailedAcquire 方法的核心工作就是确保自己的前驱节点是这个状态,这样自己才能放心地去阻塞。
- 清理取消的节点:在寻找有效前驱时,会跳过那些已取消 (CANCELLED) 的节点,维护队列的健康。
- park:这是最终让线程进入等待状态的地方,底层调用 Unsafe.park(),非常高效。
- 新的头节点:当节点成功获取锁后,它会成为新的头节点。旧的头节点会被断开链接。头节点代表的永远是当前持有锁的节点(或刚刚释放锁的节点)。
走到这里,一个获取锁失败的线程,它的 lock() 调用之旅就暂时告一段落了——它要么成功获取了锁,要么已经在队列中安静地阻塞(park)了,等待着被唤醒的那一天。
最后一站:释放锁 - lock.unlock()
持有锁的线程执行完临界区代码后,必须在 finally 中调用 unlock() 来释放锁,以便唤醒后继等待的线程。让我们看看这又是如何发生的。- // ReentrantLock.java
- public void unlock() {
- sync.release(1); // 同样是委托给sync,调用AQS的release模板方法
- }
- // AbstractQueuedSynchronizer.java
- public final boolean release(int arg) {
- // 1. 尝试释放锁(tryRelease,由子类实现)
- if (tryRelease(arg)) {
- Node h = head; // 获取当前头节点
- // 如果头节点不为空,并且waitStatus不为0(通常是SIGNAL,表示有后继需要唤醒)
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h); // 2. 唤醒后继节点
- return true;
- }
- return false;
- }
复制代码 又是一个模板方法!release 先调用 tryRelease 尝试释放,如果完全释放成功了(state==0),就去看看队列里有没有需要被唤醒的兄弟。
关键点四:释放状态 - tryRelease(int releases)
这个方法在 ReentrantLock.Sync 中实现。- // ReentrantLock.Sync
- protected final boolean tryRelease(int releases) {
- // 计算释放后的state
- int c = getState() - releases;
- // 非常重要的一点:如果当前线程不是锁的持有者,抛异常!
- if (Thread.currentThread() != getExclusiveOwnerThread())
- throw new IllegalMonitorStateException();
- boolean free = false;
- if (c == 0) {
- // 如果state减为0了,说明锁完全释放了,可以清空持有线程标记
- free = true;
- setExclusiveOwnerThread(null);
- }
- setState(c); // 更新state(即使不为0,也可能是重入释放了一次)
- return free; // 返回是否完全释放
- }
复制代码 源码注释:
- 重入释放:可重入锁的释放必须次数匹配。每次 unlock 只减1,只有最后一次释放才会将 state 减到0,并将 exclusiveOwnerThread 设为 null。
- 状态检查:如果当前线程压根没持有锁,直接抛异常,防止乱释放。
如果 tryRelease 返回 true(锁已完全释放),就会去执行 unparkSuccessor(h)。
关键点五:唤醒后继 - unparkSuccessor(Node node)
这是AQS队列唤醒的核心
[code]// AbstractQueuedSynchronizer.javaprivate void unparkSuccessor(Node node) { // node在这里是头节点,即刚刚释放完锁的节点 int ws = node.waitStatus; if (ws < 0) // 如果状态是SIGNAL等小于0的状态 // CAS地将头节点状态置为0,表示“唤醒任务我已开始处理” compareAndSetWaitStatus(node, ws, 0); // 获取头节点的后继节点,准备唤醒它 Node s = node.next; // 【关键点】:如果后继节点不存在或者已被取消... if (s == null || s.waitStatus > 0) { s = null; // ...那就从尾节点开始,从后往前遍历,找到离头节点最近的、有效的(未取消的)节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus |