admin管理员组文章数量:1599532
目录
- 一、概述
- 二、Condition接口常用方法
- 三、Condition基本使用
- 测试类
- wait类
- signal类
- 运行结果
- 四、源码分析
- await方法系列
- await()
- addConditionWaiter()
- fullyRelease(node)
- isOnSyncQueue(node)
- **下面方法为其他线程调用signal方法后,被唤醒的线程将要执行的方法**
- checkInterruptWhileWaiting(node)
- acquireQueued(node, savedState)
- reportInterruptAfterWait(interruptMode)
- signal方法系列
- signal()
- doSignal(first)
- transferForSignal(first)
- 四、总结
一、概述
- 在使用Lock之前,我们使用的最多的同步方式应该是synchronized关键字来实现同步方式了。配合Object的wait()、notify()系列方法可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。Object和Condition接口的一些对比。摘自《Java并发编程的艺术》
二、Condition接口常用方法
- condition可以通俗的理解为条件队列。当一个线程在调用了await方法以后,直到线程等待的某个条件为真的时候才会被唤醒。这种方式为线程提供了更加简单的等待/通知模式。Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。
- await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
- await(long time, TimeUnit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
- awaitNanos(long nanosTimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
- awaitUninterruptibly() :造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
- awaitUntil(Date deadline) :造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。
- signal() :唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
- signalAll() :唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。
三、Condition基本使用
测试类
public class Demo {
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
ConditionDemoSignal signal = new ConditionDemoSignal(lock, condition);
ConditionDemoWait wait = new ConditionDemoWait(lock, condition);
new Thread(wait).start();
Thread.sleep(100);
new Thread(signal).start();
}
}
wait类
public class ConditionDemoWait implements Runnable {
private Lock lock;
private Condition condition;
public ConditionDemoWait(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin - ConditionDemoWait");
try {
lock.lock();
condition.await();
System.out.println("end - ConditionDemoWait");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
signal类
public class ConditionDemoSignal implements Runnable {
private Lock lock;
private Condition condition;
public ConditionDemoSignal(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin - ConditionDemoSignal");
try {
lock.lock();
condition.signal();
System.out.println("end - ConditionDemoSignal");
}finally {
lock.unlock();
}
}
}
运行结果
四、源码分析
await方法系列
await()
- 调用 Condition 的 await()方法(或者以 await 开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从 await()方法返回时,当前线程一定获取了Condition 相关联的锁
public final void await() throws InterruptedException {
if (Thread.interrupted())//表示 await 允许被中断
throw new InterruptedException();
Node node = addConditionWaiter();//创建一个新的节点,节点状态为 condition,采用的数据结构仍然是链表
int savedState = fullyRelease(node);//释放当前的锁,得到锁的状态,并唤醒 AQS 队列中的一个线程
int interruptMode = 0;
//如果当前节点没有在同步队列上,即还没有被 signal,则将当前线程阻塞
while (!isOnSyncQueue(node)) {//判断这个节点是否在 AQS 队列上,第一次判断的是 false,因为前面已经释放锁了
LockSupport.park(this);//通过 park 挂起当前线程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
// interruptMode != THROW_IE -> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.
// 将这个变量设置成 REINTERRUPT.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点.
// 如果是 null ,就没有什么好清理的了.
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 如果线程被中断了,需要抛出异常.或者什么都不做
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter()
- 这个方法的主要作用是把当前线程封装成 Node,添加到等待队列。这里的队列不再是双向链表,而是单向链表
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如 果 lastWaiter 不等于空并且waitStatus 不等于 CONDITION 时,把冲好这个节点从链表中移除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//构建一个 Node,waitStatus=CONDITION。这里的链表是一个单向的
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
- 执行完 addConditionWaiter 这个方法之后,就会产生一个这样的 condition 队列
fullyRelease(node)
- fullRelease,就是彻底的释放锁,什么叫彻底呢,就是如果当前锁存在多次重入,那么在这个方法中只需要释放一次就会把所有的重入次数归零。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();//获得重入的次数
if (release(savedState)) {//释放锁并且唤醒下一个同步队列中的线程
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
- 假设:ThreadA和ThreadB竞争锁时ThreadA获得了锁,然后ThreadA调用await()方法
- 此时,同步队列会触发锁的释放和重新竞争。ThreadB 获得了锁。
- 同步队列中的head指向ThreadB,等待队列condition中的firstWaiter指向ThreadA
isOnSyncQueue(node)
- 判断当前节点是否在同步队列中,返回 false 表示不在,返回 true 表示在
- 如果不在 AQS 同步队列,说明当前节点没有唤醒去争抢同步锁,所以需要把当前线程阻塞起来,直到其他的线程调用 signal 唤醒
- 如果在 AQS 同步队列,意味着它需要去竞争同步锁去获得执行程序执行权限
- 为什么要做这个判断呢?原因是在 condition 队列中的节点会重新加入到 AQS 队列去竞争锁。也就是当调用 signal
的时候,会把当前节点从 condition 队列转移到 AQS 队列
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
下面方法为其他线程调用signal方法后,被唤醒的线程将要执行的方法
checkInterruptWhileWaiting(node)
- 判断 ThreadA 在 condition 队列被阻塞的过程中,有没有被其他线程触发过中断请求
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
acquireQueued(node, savedState)
- 当前被唤醒的节点ThreadA 去抢占同步锁。并且要恢复到原本的重入次数状态。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//标记是否成功拿到资源
try {
boolean interrupted = false;//标记等待过程中是否被中断过
//又是一个“自旋”!
for (;;) {
final Node p = node.predecessor();//拿到前驱
//如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)
if (p == head && tryAcquire(arg)) {
setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
failed = false;// 成功获取资源
return interrupted;//返回等待过程中是否被中断过
}
//如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
}
} finally {
if (failed)// 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
cancelAcquire(node);
}
}
reportInterruptAfterWait(interruptMode)
- 根据 checkInterruptWhileWaiting 方法返回的中断标识来进行中断上报。
- 如果是 THROW_IE,则抛出中断异常
- 如果是 REINTERRUPT,则重新响应中断
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
signal方法系列
signal()
- await 方法会阻塞 ThreadA,然后 ThreadB 抢占到了锁获得了执行权限,这个时候在 ThreadB 中调用了 Condition的 signal()方法,将会唤醒在等待队列中节点
public final void signal() {
if (!isHeldExclusively())//先判断当前线程是否获得了锁,这个判断比较简单,直接用获得锁的线程和当前线程相比即可
throw new IllegalMonitorStateException();
// 拿到 Condition 队列上第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
doSignal(first)
- 对 condition 队列中从首部开始的第一个 condition 状态的节点,执行 transferForSignal 操作,将 node 从 condition队列中转换到 AQS 队列中,同时修改 AQS 队列中原先尾节点的状态(修改尾节点的状态是为了找到当前加入节点的安全点进行加入)
private void doSignal(Node first) {
do {
//从 Condition 队列中删除 first 节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;// 将 next 节点设置成 null
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
transferForSignal(first)
- 该方法先是 CAS 修改了节点状态,如果成功,就将这个节点放到 AQS 队列中,然后唤醒这个节点上的线程。此时,那个节点就会在 await 方法中苏醒
final boolean transferForSignal(Node node) {
//更新节点的状态为 0,如果更新失败,只有一种可能就是节点被 CANCELLED 了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//调用 enq,把当前节点添加到AQS 队列。并且返回返回按当前节点的上一个节点,也就是原tail 节点
Node p = enq(node);
int ws = p.waitStatus;
// 如果上一个节点的状态被取消了, 或者尝试设置上一个节点的状态为 SIGNAL 失败了(SIGNAL 表示: 他的 next节点需要停止阻塞),
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);// 唤醒节点上的线程.
//如果 node 的 prev 节点已经是signal 状态,那么被阻塞的 ThreadA 的唤醒工作由 AQS 队列来完成
return true;
}
- 执行完 doSignal 以后,会把 condition 队列中的节点转移到 aqs 队列上
- 这个时候会判断 ThreadA 的 prev 节点也就是 head 节点的 waitStatus,如果大于 0 或者设置 SIGNAL 失败,表示节点被设置成了 CANCELLED 状态。这个时候会唤醒ThreadA 这个线程。否则就基于 AQS 队列的机制来唤醒,也就是等到 ThreadB 释放锁之后来唤醒 ThreadA
四、总结
- await 和 signal 的总结 ,线程 awaitThread 先通过 lock.lock()方法获取锁成功后调用了 condition.await 方法进入等待队列,而另一个线程 signalThread 通过 lock.lock()方法获取锁成功后调用了 condition.signal 或者 signalAll 方法,使得线程awaitThread 能够有机会移入到同步队列中,当其他线程释放 lock 后使得线程 awaitThread 能够有机会获取lock,从而使得线程 awaitThread 能够从 await 方法中退出执行后续操作。如果 awaitThread 获取 lock 失败会直接进入到同步队列。
▄█▀█●各位同仁,如果我的代码对你有帮助,请给我一个赞吧,为了下次方便找到,也可关注加收藏呀
如果有什么意见或建议,也可留言区讨论
版权声明:本文标题:JUC常见工具类使用及原理分析--Condition 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dongtai/1728321450a1153875.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论