admin管理员组文章数量:1599541
一、Condition的作用
ReentrantLock虽然实现了互斥,但是如何实现进程间的相互通信呢?
这就需要借助Condition来实现。
就像synchronized实现互斥,同时配合notify()和wait()方法来实现线程的通信一样。
Condition必须和Lock配合使用,其提供await()方法和signal()方法,作用类似wait()和notify(),用来实现线程间的通信。
二、具体实现
-
首先,我们创建一个ReentrantLock对象之后,调用该对象的newCondition()方法创建一个Condition对象。
-
底层为调用了Sync.newCondition()方法,创建了一个ConditionObject对象。
public Condition newCondition() { return sync.newCondition(); } final ConditionObject newCondition() { return new ConditionObject(); }
-
ConditionObject对象内部维护了一个双向链表,和AQS类似,持有一个firstWaiter和一个lastWaiter。
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; //.....其他代码 }
-
await()方法的实现
public final void await() throws InterruptedException { //正要执行await,被中断,抛出异常 if (Thread.interrupted()) throw new InterruptedException(); //加入Condition的等待队列中,此时只是Node加入到了队列,线程还未阻塞 Node node = addConditionWaiter(); //释放锁 int savedState = fullyRelease(node); int interruptMode = 0; //检测节点是否在AQS阻塞队列中,如果不在,则唤醒自己也无法争抢锁,继续睡眠 //此处为假设两个线程,A持有锁,A调用await(),B直接被阻塞,然后A调用await(),让出锁, //同时node插入到条件等待队列,且自己不再同步队列中,所以睡眠 //此时会唤醒B,B获得锁,然后B调用signal(),将A的Node从条件阻塞队列加入到同步队列, //然后调用await(),B进入原来A的状态。同时在fullyRelease()函数中唤醒阻塞的第一个线程 //A醒来后,进行循环判断,判断自己在AQS中, while (!isOnSyncQueue(node)) { LockSupport.park(this);//自己阻塞自己 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //被signal()方法唤醒后,要重新尝试获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled //清理条件队列中所有状态不是CONDITION状态的节点 unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } //addConditionWaiter(),总体逻辑就是将Node加入到条件等待队列的队尾 private Node addConditionWaiter() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //创建一个新的Node Node node = new Node(Node.CONDITION); //如果队列为空,即队列为空时,将node插入到队列中,并且作为队列第一个节点 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } final int fullyRelease(Node node) { try { int savedState = getState();//获取当前的重入次数 if (release(savedState)) return savedState; throw new IllegalMonitorStateException(); } catch (Throwable t) { node.waitStatus = Node.CANCELLED; throw t; } } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//唤醒阻塞队列上的第一个 return true; } return false; } final boolean isOnSyncQueue(Node node) { //状态为Condition,获取前驱节点为null,返回false if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //后继节点不为null,肯定在CLH同步队列中,因为如果Node中pre和next变量标记AQS阻塞队列的前驱和后继 //nextWaiter记录条件等待队列上的后继,所以next只有在进入到AQS阻塞队列才会设置 if (node.next != null) return true; //其他情况,从AQS阻塞队列的队尾开始遍历,查看当前节点是否在阻塞队列中,没想到这是什么情形。。。 return findNodeFromTail(node); }
-
signal()方法的实现
public final void signal() { if (!isHeldExclusively()) //判断当前线程是否获取了锁,如果无锁,则抛异常 throw new IllegalMonitorStateException(); Node first = firstWaiter;//得到条件阻塞队列的队头 if (first != null) doSignal(first);//唤醒队头第一个,不同于AQS的阻塞队列 //从条件阻塞队列唤醒后,还需要加入到AQS队列阻塞中 } //将队头节点移入到AQS阻塞队列中,并将该节点从条件队列中移除 private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; //transferForSignal()将头节点移动到AQS阻塞队列 } while (!transferForSignal(first) && (first = firstWaiter) != null); } // final boolean transferForSignal(Node node) { //判断当前node是否为CONDITION状态,如果不是则直接返回 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false; //将P加入到AQS阻塞队列中 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
注意:signal()只是将线程从Condition的条件阻塞队列移动到了AQS的阻塞队列,此时线程并没有被唤醒,只有当持有锁的线程调用了await()方法或者unlock()方法后,才会唤醒AQS阻塞队列中被阻塞的线程。而如果不调用signal(),则阻塞在条件队列上的线程永远不可能有机会进入到AQS阻塞队列。
三、DEMO
举最简单的两个线程A,B轮流打印1-100
public class OneToHundred {
public static void main(String[] args) throws InterruptedException {
Task t = new Task();
Thread t1 = new Thread(t,"A");
Thread t2 = new Thread(t,"B");
t1.start();
t2.start();
}
}
class Task implements Runnable{
private int number = 0;
private ReentranLock lock = new ReentranLock();
private Condition condition = lock.newCondition();
@Override
public void run(){
while(number<100){
lock.lock();
number++;
condition.signal();
try{
if(number<100)
condition.await();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//假设A线程先获得锁,B会阻塞(同步队列),A第一次调用signal(),但此时条件队列为空,firstWaiter为null,
//然后A调用await(),让出锁,同时自己进入条件等待队列,用一个变量记录自己持有锁的次数(saveState)
//然后释放锁的时候唤醒同步队列的第一个阻塞线程,即B,B醒了以后,尝试获取锁
//假设只有这两个线程,则B获取锁成功,number++,然后signal(),将A加入到同步队列,然后await(),
//释放锁,唤醒A,将自己阻塞。
//A醒来后,判断自己是否在同步队列中,在的话跳出自旋(这个地方肯定是在同步队列中的,因为唤醒只会唤醒
//同步队列上的线程。此处while循环是怕线程在别的情况下醒来)
//继续执行await()方法,没执行完,在await()方法内部阻塞了
//然后调用acquire(node,saveState)尝试获得锁,A获取成功,然后number++,在signal(),再调用await()
//这样A和B可以一直输出到100
//但是缺少了unlock()操作,ReentrantLock被A和B分别重入50次,A和B可以相互交替运行完全是因为await()和
//signal()的机制,所以未改正前,当A退出时,锁被A持有,且state = 50,表示重入了50次。
//调用了signal()也只是把B线程从条件队列移动到同步队列,没有调用await(),没让出锁。
//即便在最后调用一次unlock(),也只是将state - 1,变为49,依然未释放锁。
//所以,在每次lock()之后一定要unlock().
//改正
class Task2 implements Runnable{
private int number = 0;
private ReentranLock lock = new ReentranLock();
private Condition condition = lock.newCondition();
@Override
public void run(){
while(number<100){
lock.lock();
number++;
condition.signal();
try{
if(number<100)
condition.await();
}catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
}
}
本文标签: ReentrantLockcondition
版权声明:本文标题:ReentrantLock中Condition的使用 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dianzi/1728321362a1153864.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论