admin管理员组

文章数量:1599543

本文将介绍接口Condition的实现原理。
Condition是在Lock中使用,可以通过调用Lock.newCondition()获得一个Condition对象。每个Condition对象都与一个Lock对象相关,调用Condition对象的方法前必须获得对应Lock对象的锁,Condition的作用与Object的wait()/notify()作用类似,调用Condition.await()可以阻塞当前线程,Condition.signal()/signalAll()可以唤醒其他阻塞线程。

文章目录

  • 一、Condition
  • 二、ConditionObject
    • 1、await()
    • 2、signal()/signalAll()

一、Condition

Condition是一个接口,其定义如下:

public interface Condition {
	//阻塞当前线程,等待同一个Condition对象上的signal()/signalAll()唤醒
	//该方法响应中断,如果发生中断,该方法抛出InterruptedException异常
    void await() throws InterruptedException;
	///阻塞当前线程,等待同一个Condition对象上的signal()/signalAll()唤醒
	//与上面方法的区别是,该方法等待过程中不响应中断
    void awaitUninterruptibly();
	//阻塞线程,线程被阻塞指定的时间
	//当线程被中断、超时或者signal()/signalAll(),都会唤醒线程
    long awaitNanos(long nanosTimeout) throws InterruptedException;
	//同awaitNanos()
    boolean await(long time, TimeUnit unit) throws InterruptedException;
	//同awaitNanos()
    boolean awaitUntil(Date deadline) throws InterruptedException;
	//唤醒任意一个等待线程,注意只唤醒一个
    void signal();
	//唤醒所有的等待线程
    void signalAll();
}

java8提供了一个Condition的实现类ConditionObject,该类是在AbstractQueuedSynchronizer中实现的,我们可以调用ReentrantLock.newCondition()/ReadLock.newCondition()/WriteLock.newCondition()得到一个与锁相关的Condition对象。
下面介绍一个ConditionObject的实现原理。

二、ConditionObject

下面介绍ConditionObject的await()、signal()和signalAll()三个方法,其他方法实现原理可以参考await()。

1、await()

调用await()前,必须获得对应Lock对象的锁,否则抛出java.lang.IllegalMonitorStateException异常。调用await()后当前线程会被阻塞,同时释放已经获得的锁。await()方法的作用与Object.wait()方法作用类似。
被阻塞的线程可以通过中断唤醒或者signal()/signalAll()唤醒。
下面看一下源码:

        public final void await() throws InterruptedException {
            if (Thread.interrupted())//检查是否已经发生中断
                throw new InterruptedException();
            //每个线程创建一个Node对象,将Node对象放入等待signal的队列
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);//释放当前线程已经获得的锁
            int interruptMode = 0;
            //isOnSyncQueue()方法检查当前线程是否已经在锁的等待队列中
            //注意这里涉及两个队列,一个队列是等待signal的队列,另一个是等待锁的队列
            //isOnSyncQueue()检查当前线程是否在锁的等待队列中
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);//阻塞当前线程
                //如果发生了中断,checkInterruptWhileWaiting()将node节点添加到锁的等待队列中
                //并修改节点状态为0(0表示初始状态)
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //acquireQueued()用于申请锁,如果没有申请上,会被阻塞,阻塞后的线程会被中断或者其他线程释放锁唤醒
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
            	//将已经取消的节点,也就是发生线程中断的节点从signal等待队列中删除
                unlinkCancelledWaiters();
            if (interruptMode != 0)
            	//根据interruptMode值不同,可以重新发起线程中断或者抛出InterruptedException异常
                reportInterruptAfterWait(interruptMode);
        }

进入await()方法后,会先调用addConditionWaiter()将当前线程添加到一个等待队列中。该队列是一个单向队列,使用属性firstWaiter和lastWaiter分别表示队列的头结点和尾结点,队列中的每个节点使用Node表示,Node对象持有当前线程对象和节点状态。所有等待signal的线程都会进入该队列中,signal()和signalAll()方法也从该队列中选择线程唤醒。新加入的节点都添加到队列尾。

        //addConditionWaiter()针对每个线程创建一个Node对象,
        //并将该Node对象放到等待signal队列的尾
        //该等待队列使用firstWaiter和lastWaiter分别表示队列的头结点和尾节点
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
            	//如果节点状态不是Node.CONDITION,表示对应线程已经取消锁申请,
            	//或者正在等待锁,还有可能已经申请到锁
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //创建Node对象,
            //状态Node.CONDITION表示当前节点正在等待signal唤醒
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

线程添加到等待signal的队列中后,接着调用fullyRelease()方法释放所有持有的锁,然后使用一个while循环检查当前节点是否已经加入到了锁等待队列中,以及是否发生线程中断。这里出现了两个队列,一个队列是等待signal的队列,一个是等待锁的队列,signal等待队列中线程可以被signal()和signalAll()方法唤醒,锁等待队列中的线程是正在等待锁,也就是调用了Lock.lock()方法后线程没有申请到锁,而进入等待队列,这些线程可以被Lock.unlock()唤醒。
初始的时候,线程只在signal等待队列中,当发生线程中断或者当前线程被signal()和signalAll()方法唤醒,那么该线程也会进入锁等待队列中,此时isOnSyncQueue()方法为true。
在这个while循环中会使用checkInterruptWhileWaiting()方法检查是否发生线程中断:

   		private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?//判断是否发生线程中断,如果发生,进入transferAfterCancelledWait()
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }
	    final boolean transferAfterCancelledWait(Node node) {
	    	//将节点状态修改为初始状态,也就是0
	        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
	        	//将当前节点添加到锁等待队列中
	            enq(node);
	            return true;
	        }
			//如果修改节点状态失败,说明当前节点已经被signal唤醒了,
			//那么下面进入自旋状态,直到节点被加入到锁等待队列中再退出
	        while (!isOnSyncQueue(node))
	            Thread.yield();
	        return false;
	    }

执行完while循环后,下面就要调用acquireQueued()申请锁了,acquireQueued()方法根据锁等待队列中当前节点是否是头结点而决定是否分配锁,如果不是头结点就继续阻塞当前节点。
在await()方法的最后,调用了reportInterruptAfterWait()来决定是否重新发起中断还是直接抛出InterruptedException异常。如果当前线程是因为中断被唤醒的,那么抛出InterruptedException异常,如果当前线程被signal唤醒,唤醒后又发生了中断,那么reportInterruptAfterWait()会将中断补上,重新再发起一次线程中断。

        //根据interruptMode值不同,可以重新发起线程中断或者抛出InterruptedException异常
        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

简单总结一下await()方法,每次进入await()方法前都需要持有锁,进入await()方法后,会将持有的锁全部释放,然后将线程阻塞,等待signal或者线程中断,当两者其中一个发生后,线程被唤醒,接着再次申请之前释放掉的锁,如果锁申请成功后,则开始后续应用程序的逻辑。

2、signal()/signalAll()

先来看一下signal()方法。

        public final void signal() {
        	//isHeldExclusively()检查当前线程是否持有锁,如果是,返回true。
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //firstWaiter表示signal等待队列的头结点
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
        //唤醒等待节点,从队列头开始遍历节点,如果节点唤醒失败,则尝试唤醒下一个节点
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
	    final boolean transferForSignal(Node node) {
	    	//修改节点状态,将节点状态修改为初始状态,也就是0
	    	//如果节点状态修改失败,说明当前线程已经被中断,停止唤醒
	        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
	            return false;
			//将节点放入锁等待队列
	        Node p = enq(node);
	        int ws = p.waitStatus;
	        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
	        	//如果当前线程取消了锁的申请,或者因为不知明的原因导致状态修改失败,那么唤醒一次线程
	        	//有一种情况可以导致状态修改失败,调用了signal()后,同时发生了线程中断,
	        	//await()方法调用acquireQueued()申请锁,acquireQueued()方法会修改节点状态,
	        	//当acquireQueued()方法先于上面状态修改时,会导致上面状态修改失败 
	            LockSupport.unpark(node.thread);
	        return true;
	    }

signal()方法相对于await()方法要简单一些。它从signal等待队列的头开始遍历,找到第一个没有发生过中断的线程(发生了中断的线程已经被唤醒了),将该线程加入到锁等待队列中,等待申请锁。signal()方法处理完后,会释放锁,如果刚才加入到锁等待队列中的线程申请到了锁,那么该线程就会被唤醒。

signalAll()与signal()方法有很多相似的地方。signalAll()也是从signal等待队列的头开始遍历,将队列中的每个节点都调用一次transferForSignal()方法,transferForSignal()将每个线程都添加到锁等待队列中,等待申请锁。

        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

本文标签: 详解原理LockconditionSignal