admin管理员组文章数量:1599887
1.概述
Condition是JAVA1.5中出现的,它的作用是对原生的wait()、notify()/notifyAll()方法进行增强,相对于传统的wait()和notify()方法,Condition中的await()和signal()这种方式实现线程中协作更加高效。Condition是一个接口,主要依赖于JUC中的Lock接口,它的await()、signal()方法都要在lock同步锁内完成,也就是该部分代码必须存在于lock.lock()、lock.unlock()之间。本文将从Condition的使用方式,分析Condition的源码,帮助大家更好的理解其原理。
2.Condition和Signal使用及原理
2.1 分析案例
假如有三个不同的线程 A、B、C 将会共用一个 FooThread实例。线程 A 将会调用 first() 方法,线程 B 将会调用 second() 方法
线程 C 将会调用 third() 方法,请设计修改程序,以确保 second() 方法在 first() 方法之后被执行,third() 方法在 second() 方法之后被执行。
解析:这道题的要考察的原理是要考察线程的有序执行,线程A首先执行完成,线程B监测到线程A执行完成后执行线程B,线程C同样检测到线程B执行完成后再执行。
2.1.1 wait()/notify()方式实现
实现代码如下:
public class PrintThreadFirst implements Runnable{
@Override
public void run() {
System.out.println("线程A已经执行了");
}
}
public class PrintThreadSecond implements Runnable{
@Override
public void run() {
System.out.println("线程B已经执行了");
}
}
public class PrintThreadThird implements Runnable{
@Override
public void run() {
System.out.println("线程C已经执行了");
}
}
public class MainTest {
private static Object object = new Object();
private static volatile int flag = 1;
public void first(Runnable printFirst) throws InterruptedException {
synchronized (object) {
while (flag != 1) {
object.wait();
}
printFirst.run();
flag = 2;
object.notifyAll();
}
}
public void second(Runnable printSecond) throws InterruptedException {
synchronized (object) {
while (flag != 2) {
object.wait();
}
printSecond.run();
flag = 3;
object.notifyAll();
}
}
public void third(Runnable printThird) throws InterruptedException {
synchronized (object) {
while (flag != 3) {
object.wait();
}
printThird.run();
object.notifyAll();
flag=0;
System.exit(1);
}
}
public static void main(String[] args) {
MainTest mainTest = new MainTest();
new Thread(() -> {
while (true) {
try {
mainTest.second(new PrintThreadSecond());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(() -> {
while (true) {
try {
mainTest.first(new PrintThreadFirst());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(() -> {
while (true) {
try {
mainTest.third(new PrintThreadThird());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
运行结果如下:
2.1.2 await()/signal()方式实现
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class AwaitSignalTest {
private static volatile int flag = 1;
private ReentrantLock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void first(Runnable printFirst) throws InterruptedException {
lock.lock();
while (flag != 1) {
condition1.await();
}
printFirst.run();
flag = 2;
condition2.signal();
lock.unlock();
}
public void second(Runnable printSecond) throws InterruptedException {
lock.lock();
while (flag != 2) {
condition2.await();
}
printSecond.run();
flag = 3;
condition3.signal();
lock.unlock();
}
public void third(Runnable printThird) throws InterruptedException {
lock.lock();
while (flag != 3) {
condition3.await();
}
printThird.run();
lock.unlock();
}
public static void main(String[] args) throws InterruptedException {
AwaitSignalTest awaitSignalTest = new AwaitSignalTest();
new Thread(() -> {
try {
awaitSignalTest.first(new PrintThreadFirst());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
awaitSignalTest.third(new PrintThreadThird());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
//Thread.sleep(2000L);
new Thread(() -> {
try {
awaitSignalTest.second(new PrintThreadSecond());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
运行结果如下:
2.2 Condition 源码分析
2.2.1 Condition接口
public interface Condition {
/**
* 让线程进入等待,如果其他线程调用同一Condition对象的notify/notifyAll,那么等待的线程可能被唤醒
*/
void await() throws InterruptedException;
/**
* 不抛出中断异常的await方法
*/
void awaitUninterruptibly();
/**
* 带超时的await
*/
long awaitNanos(long nanosTimeout) throws InterruptedException;
/**
* 带指定超时时间的await
*/
boolean await(long time, TimeUnit unit) throws InterruptedException;
/**
* 带指定截止时间的await
*/
boolean awaitUntil(Date deadline) throws InterruptedException;
/**
* 唤醒等待的线程
*/
void signal();
/**
* 唤醒所有线程
*/
void signalAll();
}
2.2.2 AQS中Condition的使用
在AQS的源码(AQS原理可查看上一篇文章:JAVA多线程基础篇-AQS(AbstractQueuedSynchronizer)),内部有一个类ConditionObject实现了Condition接口,作为等待队列。ConditionalObject内部字段如下:
//ConditionObject维护的等待队列的头节点
private transient Node firstWaiter;
//ConditionObject维护的等待队列的尾节点
private transient Node lastWaiter;
ConditionObject是一个单向队列,调用await()方法后线程依次尾插到等待队列中,具体原理图如下所示:
在实际应用中,每调用一次lock.newCondition()都会产生一个新的等待队列,也就是说,一个lock锁可以持有多个等待队列,在并发包Lock中,一个Lock锁对应一个阻塞队列和多个等待队列,而在对象监视器上只有一个阻塞队列和一个等待队列。Lock锁中对应阻塞队列原理图如下:
1.ConditionObject.await()
public final void await() throws InterruptedException {
//线程中断则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//新增一个节点到等待队列尾部
//此处等待队列与AQS中的阻塞队列是两个不同的队列
//若当前线程能够调用await(),则说明此时线程拥有锁,此时AQS阻塞队列中,线程应该处于head节点
Node node = addConditionWaiter();
//释放当前的锁,并获取锁的状态,并唤醒AQS队列中的一个线程
long savedState = fullyRelease(node);
//是否中断标识
int interruptMode = 0;
//如果当前节点没有在同步队列上,即还没有signal,则将当前线程阻塞
//判断这个节点是否在AQS队列上,第一次判断的是 false,因为前面已经释放锁了
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//检查是否是由于被中断而唤醒,如果是,则跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//在阻塞队列中尝试获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//节点已经在阻塞队列中,与Condition的等待队列联系断开
//对于SIGNAL唤醒的线程而言,SIGNAL时除了将节点移到阻塞队列,同时也清空了node.nextWaiter
//而对于中断唤醒的线程而言,只是将节点移到阻塞队列,并没有清空node.nextWaiter(因为此时线程不持有,操作等待线程并非线程安全)
if (node.nextWaiter != null) // clean up if cancelled
//从队列中清除非CONDITION状态的节点
unlinkCancelledWaiters();
//根据interruptMode 决定是否需要抛出异常
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
上述方法的主要流程如下图所示:
在步骤1中,主要调用了addConditionWaiter()方法,其源码如下:
private Node addConditionWaiter() {
//获取Condition队列中的尾节点,赋值给t
Node t = lastWaiter;
// 若尾节点存在且状态不为CONDITION,调用unlinkCancelledWaiters()方法清除阻塞队列中失效节点(被取消的节点)
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//构建Node节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果t为null,将node置为等待队列中的头节点
if (t == null)
firstWaiter = node;
else
//否则将node置于等待队列尾部
t.nextWaiter = node;
//更新链表尾节点为node
lastWaiter = node;
return node;
}
这个方法的作用就是将线程包装成node,并添加到等待队列的队尾。
步骤2中调用fullRelease()释放锁,主要通过调用AQS中的模板方法release()方法是否AQS的同步状态并唤醒在同步队列中头节点的后继节点,如果释放成功则正常返回,如果失败则抛出异常。具体代码如下:
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
//成功释放同步状态
failed = false;
return savedState;
} else {
//不成功释放同步状态抛出异常
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
2.signal()
public final void signal() {
//非独占式锁,直接抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//获取ConditionObject的头节点
Node first = firstWaiter;
if (first != null)
//唤醒等待队列中的头节点
doSignal(first);
}
signal()方法的主要作用是将Condition等待队列中的头节点唤醒,重新移至AQS的阻塞队列。
3.doSignal(Node first)
private void doSignal(Node first) {
do {
//找出第一个需要唤醒的节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//断开头节点和后继节点之间的关系
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null); //若等待队列中的头节点唤醒失败,且队列中还有其它节点,继续唤醒其它节点
}
因为头节点可能会被取消,所以这里一直在等待队列中从前往后寻找并依次唤醒,直至一个节点成功唤醒或者等待队列中没有节点需要唤醒。
4.transferForSignal(Node node)
这个方法的主要作用是将等待队列中的节点移动到AQS的阻塞队列中(该方法主要在signal()相关方法中被调用),
final boolean transferForSignal(Node node) {
//CAS更新节点的waitStatus值为CONDITION
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
//操作失败,说明节点已经被中断操作过,waitStatus已经变成了0
return false;
//CAS操作成功,将等待队列节点添加到阻塞队列的尾部
Node p = enq(node);
//获取当前节点的waitStatus状态,
int ws = p.waitStatus;
//node的上一个节点被取消,或者尝试设置node节点的上一个节点的状态为SIGNAL失败了,表示node前一个节点被设置成CANCELLED 状态,唤醒node节点上的线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
//如果node的prev节点已经是SIGNAL状态,那么被阻塞线程的唤醒工作由AQS阻塞队列来完成
return true;
}
这段代码主要做两件事情:(1)将头节点状态更改为CONDITION;(2)调用enq()方法将该节点尾插入到AQS阻塞队列中。对应await()方法中部分为:
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
3.小结
1.Condition的出现是对原生wait()/notify()的增强,它使得线程之间协作更加高效;
2.在Condition调用await()方法后,线程会释放锁资源,如果节点不在AQS阻塞队列,则阻塞当前线程,如果在阻塞队列,则自旋尝试获取锁;调用signal()方法通知后,节点会从condition队列移动到AQS阻塞i队列,进入正常获取锁流程;
3.Condition是基于AQS实现的,Condition的实现类ConditionObject是AQS的内部类,底层使用了一部分AQS的逻辑;
4.AQS是一个双向等待队列,Condition是一个单项等待队列,二者之间并无明确联系,节点在阻塞状态被唤醒后,会从等待队列移动至同步队列中。
4.参考文献
1.https://www.jianshu/p/28387056eeb4
2.https://wwwblogs/insaneXs/p/12219097.html
本文标签: 多线程进阶篇JavaconditionSignal
版权声明:本文标题:JAVA多线程进阶篇-Condition和Signal 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dianzi/1728321275a1153852.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论