目录
- 1. 是什么
- 2. 如何使用
- 3. 实现原理
- 3.1. uml
- 3.2. 创建Condition对象
- 3.2.1. 创建AQS.ConditionObject对象
- 3.2.1.1. ConditionObject内部也有一个双向队列
- 3.3. await方法【阻塞等待】
- 3.3.1. 加入condition队列尾部
- 3.3.2. 调用AQS解锁,释放互斥量
- 3.3.3. 检测是否在AQS队列,不在则需要阻塞
- 3.3.4. 当前节点已经在AQS队列中了,获取锁
- 3.4. signalAll方法【唤醒所有阻塞等待的节点】
- 3.4.1. 把condition队列的所有节点转移到AQS队列中
- 3.4.1.1. 每转移一个condition队列中的节点到aqs队列中,就唤醒一个
- 3.5. signal方法【只唤醒头部阻塞等待的节点】
- 4. 为什么await需要先释放锁,而signal不需要
- 5. 总结
- 6. 参考链接
1. 是什么
类似object的wait和notify方法配合synchronized使用
condition的await和notify方法配合Lock使用,用来实现条件等待与唤醒
2. 如何使用
- public class ConditionTest
- {
- private Lock lock;//一个锁说明读写互斥
- private int capacity;
- private List<Object> items;
- private Condition notFull;//用来唤醒写线程
- private Condition notEmpty;//用来唤醒读线程
- public ConditionTest(int capacity)
- {
- this.capacity = capacity;
- this.items = new ArrayList<>();
- this.lock = new ReentrantLock();
- this.notFull = lock.newCondition();
- this.notEmpty = lock.newCondition();
- }
- public void add(Object data) throws InterruptedException
- {
- try
- {
- lock.lock();
- //新增的时候如果已经满了,那么等待 非满信号 唤醒
- while (this.items.size() == capacity)
- {
- this.notFull.await();
- }
- //增加了一个元素,那么 唤醒非空
- this.items.add(data);
- this.notEmpty.signalAll();
- }
- finally
- {
- lock.unlock();
- }
- }
- public Object remove() throws InterruptedException
- {
- try
- {
- lock.lock();
- //删除的时候已经空了,那么等待 非空信号 唤醒
- while (this.items.size() == 0)
- {
- this.notEmpty.await();
- }
- //删除了一个元素,那么 唤醒非满
- Object data = this.items.remove(0);
- this.notFull.signalAll();
- return data;
- }
- finally
- {
- lock.unlock();
- }
- }
- public static void main(String[] args)
- {
- ConditionTest conditionTest = new ConditionTest(5);
- new Thread(() -> {
- for (int i = 0; i < 1000; i++)
- {
- try
- {
- conditionTest.add(i);
- System.out.println(String.format("生产者放入%d", i));
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- }).start();
- new Thread(() -> {
- try
- {
- while (true)
- {
- Object data = conditionTest.remove();
- System.out.println(String.format("消费者消费%d", data));
- }
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }).start();
- }
- }
复制代码 3. 实现原理
3.1. uml
3.2. 创建Condition对象
- public Condition newCondition() {
- //调用Sync的newCondition方法
- return sync.newCondition();
- }
复制代码 3.2.1. 创建AQS.ConditionObject对象
- final ConditionObject newCondition() {
- //AQS的ConditionObject
- return new ConditionObject();
- }
复制代码 3.2.1.1. ConditionObject内部也有一个双向队列
- public class ConditionObject implements Condition, java.io.Serializable {
- //condition队列也是一个双向队列
- private transient Node firstWaiter;
- private transient Node lastWaiter;
- public ConditionObject() { }
- }
复制代码 结构如下图:
没错,Condition队列和AQS就是两个不同队列,Condition的操作就是在这两个队列中来回移动
3.3. await方法【阻塞等待】
- public final void await() throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- //加入condition队列尾部
- Node node = addConditionWaiter();
- //调用AQS解锁,释放互斥量(执行await肯定是在获取了锁后的)
- int savedState = fullyRelease(node);
- int interruptMode = 0;
- //调用AQS死循环检测是否在AQS队列中,不在的话阻塞当前线程。
- //什么时候加入AQS队列呢?signal的时候
- while (!isOnSyncQueue(node)) {
- LockSupport.park(this);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
- }
- //获已经在AQS队列中了,获取锁
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- //如果node不是condition队列的尾节点
- if (node.nextWaiter != null) // clean up if cancelled
- //那么遍历删除conditoin队列中所有cancel节点
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- }
复制代码
- 5行:加入condition队列尾部
- 7行:调用AQS解锁,释放互斥量(由此可知执行await肯定是在获取了锁后的)
- 11-15行:不停地检查是否在AQS阻塞队列中,不在的话阻塞当前线程。等待唤醒继续检查
- 17-22行:到达这里的时候说明已经在AQS队列中了,并且已被唤醒,那么我就要去抢占锁了。如果抢占失败继续回到11-15行
下面对这几个步骤作详细说明
3.3.1. 加入condition队列尾部
- private Node addConditionWaiter() {
- //队尾
- Node t = lastWaiter;
- //队尾的状态不为CONDITION(即为CANCEL)
- if (t != null && t.waitStatus != Node.CONDITION) {
- //删除conditoin队列中所有cancel节点
- unlinkCancelledWaiters();
- //重新从尾节点开始
- t = lastWaiter;
- }
- //构造节点(当前线程,CONDITION状态)
- Node node = new Node(Thread.currentThread(), Node.CONDITION);
- //把节点加入condition队列尾部
- //这是队列为空的情况
- if (t == null)
- firstWaiter = node;
- //队列不为空的情况
- else
- t.nextWaiter = node;
- lastWaiter = node;//新的尾节点
- return node;
- }
复制代码 上面的代码所作的就是用当前线程构造成Condition节点,加入Condition队列的尾部;
除此之外,unlinkCancelledWaiters还会从头部开始往后删除conditoin队列中所有cancel节点,如下:
- private void unlinkCancelledWaiters() {
- //从头节点出发
- Node t = firstWaiter;
- Node trail = null;
- //遍历condition队列
- while (t != null) {
- Node next = t.nextWaiter;
- //如果节点状态为CANCEL
- if (t.waitStatus != Node.CONDITION) {
- //那么从condition队列中删除
- t.nextWaiter = null;
- //头节点是CANCEL的,那就修改头节点
- if (trail == null)
- firstWaiter = next;
- //头节点不是CANCEL的,那就修改前一个节点的nextWaiter
- else
- trail.nextWaiter = next;
- if (next == null)
- lastWaiter = trail;
- }
- //节点状态不为CANCEL,那么跳过
- else
- trail = t;
- //继续下一个节点
- t = next;
- }
- }
复制代码 3.3.2. 调用AQS解锁,释放互斥量
- final int fullyRelease(Node node) {
- boolean failed = true;
- try {
- //获取当前互斥量
- int savedState = getState();
- //调用AQS.release释放这些互斥量
- if (release(savedState)) {
- //释放成功后返回释放的互斥量个数
- failed = false;
- return savedState;
- } else {
- throw new IllegalMonitorStateException();
- }
- } finally {
- //解锁失败需要把当前节点置为CANCEL状态
- if (failed)
- node.waitStatus = Node.CANCELLED;
- }
- }
复制代码- public final boolean release(int arg) {
- //调用AQS.tryRelease释放锁
- if (tryRelease(arg)) {
- //释放锁成功后把AQS队列的头节点的线程唤醒
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
复制代码 由于ReentrantLock重写的了AQS的tryRelease,因此调用的是ReentrantLock.tryRelease,如下:
3.3.2.1. 尝试释放互斥量
- protected final boolean tryRelease(int releases) {
- //计算释放完releases个信号量还剩多少要释放
- int c = getState() - releases;
- //解锁的必须和加锁同一线程
- if (Thread.currentThread() != getExclusiveOwnerThread())
- throw new IllegalMonitorStateException();
- boolean free = false;
- if (c == 0) {//剩余0个说明解锁成功
- free = true;
- setExclusiveOwnerThread(null);//置持有锁的线程为空
- }
- //设置剩余的信号量
- //由于解锁的只有一个线程,所以这里不需要使用CAS操作设置state
- setState(c);
- return free;
- }
复制代码 3.3.3. 检测是否在AQS队列,不在则需要阻塞
- final boolean isOnSyncQueue(Node node) {
- //node的状态是CONDITION,说明还在condition队列中 或者 前一个节点为空
- if (node.waitStatus == Node.CONDITION || node.prev == null)
- return false;//返回false,表示不在AQS队列中
- //next不为空(next是AQS队列专用,nextWaiter是Condition队列专用),一定在AQS队列中
- if (node.next != null) // If has successor, it must be on queue
- return true;//返回true,表示在AQS队列中
-
- //以上两种情况都不符合,那么只能到AQS队列中查找
- return findNodeFromTail(node);
- }
-
- private boolean findNodeFromTail(Node node) {
- Node t = tail;
- //从尾开始遍历,找到node
- for (;;) {
- if (t == node)
- return true;
- if (t == null)
- return false;
- t = t.prev;
- }
- }
复制代码 3.3.4. 当前节点已经在AQS队列中了,获取锁
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- //死循环直到获取锁成功
- for (;;) {
- //逻辑1.
- //当前节点的前一个节点时头节点的时候(公平锁:即我的前面没有人等待获取锁),尝试获取锁
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- //获取锁成功后设置头节点为当前节点
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- //逻辑2.
- //当前节点的前一个节点状态时SIGNAL(承诺唤醒当前节点)的时候,阻塞当前线程。
- //什么时候唤醒?释放锁的时候
- //唤醒之后干什么?继续死循环执行上面的逻辑1
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- //如果发生了异常,那么执行下面的cancelAcquire方法
- if (failed)
- cancelAcquire(node);
- }
- }
复制代码 3.3.4.1. 判断是否需要阻塞
- shouldParkAfterFailedAcquire
- //根据(前一个节点,当前节点)->是否阻塞当前线程
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- //前一个节点的状态时SIGNAL,即释放锁后承诺唤醒当前节点,那么返回true可以阻塞当前线程
- if (ws == Node.SIGNAL)
- return true;
- //前一个节点状态>0,即CANCEL。
- //那么往前遍历找到没有取消的前置节点。同时从链表中移除CANCEL状态的节点
- if (ws > 0) {
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- // 前置节点状态>=0,即0或者propagate。
- //这里通过CAS把前置节点状态改成signal成功获取锁,失败的话再阻塞。why?
- } else {
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
复制代码 3.3.4.1.1. 阻塞当前线程
- private final boolean parkAndCheckInterrupt() {
- //使用Unsafe阻塞当前线程,这里会清除线程中断的标记,因此需要返回中断的标记
- LockSupport.park(this);
- return Thread.interrupted();
- }
复制代码 3.4. signalAll方法【唤醒所有阻塞等待的节点】
- ConditionObject signalAll
- public final void signalAll() {
- //如果当前线程不是持有互斥量的线程,直接抛出异常
- if (!isHeldExclusively())
- throw new IllegalMonitorStateException();
- //Condition队列不为空
- Node first = firstWaiter;
- if (first != null)
- //把condition队列的所有节点转移到AQS队列中并唤醒所有线程
- doSignalAll(first);
- }
复制代码 3.4.1. 把condition队列的所有节点转移到AQS队列中
- private void doSignalAll(Node first) {
- //清空condition队列的头、尾节点
- lastWaiter = firstWaiter = null;
- //遍历condition队列
- do {
- Node next = first.nextWaiter;
- first.nextWaiter = null;
- //转移到AQS队列中
- transferForSignal(first);
- first = next;
- } while (first != null);
- }
复制代码 3.4.1.1. 每转移一个condition队列中的节点到aqs队列中,就唤醒一个
- final boolean transferForSignal(Node node) {
- //当前节点是CONDITION状态,CAS设置为0,如果成功继续15行
- if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
- return false;//CAS设置失败,那么返回false表示唤醒失败
- //调用AQS enq,把当前节点加入AQS队列
- Node p = enq(node);
- int ws = p.waitStatus;
- //如果该结点的状态为cancel 或者 修改waitStatus为SIGNAL失败
- //没搞懂这个条件什么意思
- if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
- //唤醒当前节点的线程
- LockSupport.unpark(node.thread);
- return true;
- }
复制代码 3.4.1.1.1. 如何转移的
- private Node enq(final Node node) {
- //死循环直到加入队尾成功
- for (;;) {
- Node t = tail;
- //队列为空初始化头节点(占位符)
- if (t == null) {
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {//加入队尾
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
复制代码 3.5. signal方法【只唤醒头部阻塞等待的节点】
- public final void signal() {
- //调用ReentrantLock的方法判断当前线程是否持有锁的线程
- if (!isHeldExclusively())
- throw new IllegalMonitorStateException();
- //condition队列不为空
- Node first = firstWaiter;
- if (first != null)
- doSignal(first);
- }
复制代码 3.5.1. 唤醒头节点
- private void doSignal(Node first) {
- do {
- if ( (firstWaiter = first.nextWaiter) == null)//修改头结点,完成旧头结点的移出工作
- lastWaiter = null;
- first.nextWaiter = null;
- //将老的头结点,加入到AQS的等待队列中
- //一旦成功唤醒一个,那么退出循环返回(signalAll是唤醒所有)
- } while (!transferForSignal(first) &&
- (first = firstWaiter) != null);
- }
复制代码 4. 为什么await需要先释放锁,而signal不需要
因为await可能需要阻塞,所以在阻塞前需要先释放锁。
5. 总结
condition的一系列操作其实只涉及了AQS队列和condition队列的来回移动
- 当执行await方法时,会把当前线程加入到condition队列中,然后释放锁。接着不断检查是否在AQS队列中。
是的话开始竞争锁,只有AQS队列中的首节点能抢占成功。否则挂起
- 当执行signalAll方法时,会把condition队列中所有节点转移到AQS队列中,并唤醒所有线程。被唤醒的节点会退出是否在AQS队列中的检查,开始抢占锁
- 当执行signal方法时,会把condition队列中头节点转移到AQS队列中,并唤醒该头节点的线程。被唤醒的节点会退出是否在AQS队列中的检查,开始抢占锁
6. 参考链接
- [Java 锁]Condition 详解 - 简书
- 并发编程之 Condition 源码分析 - 掘金
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |