admin管理员组

文章数量:1584632

Condition

Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒

condition的使用

//生产者消费者模型代码
@Override
public void run() {
    while (true) {
        lock.lock();
        try {
            while (goodsList.size() == maxCount) {
                System.out.println("生产者先等等。。。");
                condition.await();
            }
            Thread.sleep(30);
            goodsList.add(++a);
            System.out.println("生产者生产商品:" + a);
            condition.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

@Override
public void run() {
    while (true) {
        lock.lock();
        try {
            while (goodsList.isEmpty()) {
                System.out.println("消费者先等等。。。");
                condition.await();
            }
            Thread.sleep(30);
            System.out.println("消费者消费商品:" + goodsList.remove());
            condition.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

condition 中两个最重要的方法,一个是 await,一个是 signal 方法

  • await:把当前线程阻塞挂起

  • signal:唤醒阻塞的线程

源码分析

调用 Condition,需要获得 Lock 锁,所以意味着会存在一个 AQS 同步队列,在上面那个案例中,假如两个线程同时运行的话,那么 AQS 的队列可能是下面这种情况,一个线程获得锁,另一个线程进入到同步队列

condition.wait()

//AQS队列中 AbstractQueuedSynchronizer
public final void await() throws InterruptedException {
    if (Thread.interrupted())//如果被中断,则抛出异常
        throw new InterruptedException();
    //创建一个新的节点,节点的状态是condition,数据结构为链表
    Node node = addConditionWaiter();
    //释放当前的锁,得到锁的状态,并唤醒AQS队列中的一个线程
    //并且缓存当前线程的state 后续唤醒时继续设置,考虑重入锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //判断节点是否在同步队列 如果不在同步队列 则阻塞线程
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //当线程被唤醒后 继续执行以下逻辑 唤醒线程
    //当这个线程醒来,会尝试拿锁,当acquireQueued返回false就是拿到锁了.
    //interruptMode != THROW_IE -> 表示这个线程被中断,但signal执行了enq方法让其入队了.
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//-1
        // 将这个变量设置成 REINTERRUPT.
        interruptMode = REINTERRUPT;//1
    //如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        // 如果线程被中断了,需要根据transferAfterCancelledWait的返回结果判断怎么处理
        reportInterruptAfterWait(interruptMode);
}

addConditionWaiter()

//创建一个新的节点,节点的状态是condition,数据结构为单向链表
private Node addConditionWaiter() {
    Node t = lastWaiter;
    //如果lastWaiter节点为cancel状态,则清理掉
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //第一次进来的时候 lastwaiter为null
    //创建一个新的节点,节点的状态是condition,数据结构为链表 传ThreadB
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    //lastWaiter 设置firstWaiter为node
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

unlinkCancelledWaiters()

//从头开始遍历,将cancel状态的节点清理
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

fullyRelease(Node node)

//释放资源
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        //释放方法同lock.unlock()一样
        //不同的是,这里不管重入了几次都是一次释放
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

isOnSyncQueue(Node node)

//判断调用condition.await的线程是否在同步队列
final boolean isOnSyncQueue(Node node) {
    //如果waitStatus为CONDITION 或者prev为null一定在同步队列 因为condition等待队列为单项列表 且没有next节点 为nextwait节点
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    //循环遍历
    return findNodeFromTail(node);
}

findNodeFromTail(Node node)

//从后往前遍历,是因为添加到AQS队列是先设置prev连接
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            //遍历到,直接返回
            return true;
        if (t == null)
            //遍历完之后 没有则返回false
            return false;
        t = t.prev;
    }
}

checkInterruptWhileWaiting(Node node)

//如果线程被中断,则执行transferAfterCancelledWait 否则返回0
//线程被中断后,通过transferAfterCancelledWait
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
    0;
}
如果当前线程被中断,则调用transferAfterCancelledWait方法判断后续的处理应该是抛出InterruptedException还是重新中断。

transferAfterCancelledWait(Node node)

final boolean transferAfterCancelledWait(Node node) {
    //cas设置节点awaitState为0,如果设置失败则表示线程cancel
    //使用 cas 修改节点状态,如果还能修改成功,说明线程被唤醒时,signal还没有被调用。
    //这里有一个知识点,就是线程被唤醒,并不一定是在 java 层面执行了locksupport.unpark,也可能是调用了线程的 interrupt()方法,这个方法会更新一个中断标识,并且会唤醒处于阻塞状态下的线程。
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        如果 cas 成功,则把node 添加到 AQS 队列
        enq(node);
        return true;
    }
    //如果cas失败,则判断当前node是否已经在AQS队列上,如果不在,则让给其他线程执行
    //当node被触发了signal方法时,node就会被加到aqs队列上
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}
该方法的返回值代表当前线程是否在park的时候被中断唤醒,如果为 true 表示中断在signal调用之前,signal还未执行,那么这个时候会根据await的语
义,在await时遇到中断需要抛出interruptedException,返回true就是告诉
checkInterruptWhileWaiting返回THROW_IE(-1)。
如果返回 false,否则表示signal已经执行过了,只需要
重新响应中断即可

condition.signal()

public final void signal() {
    //判断当前共享资源中的Thread是否和当前线程一致,否则抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //拿到等待队列的头节点
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        //将下一个节点设置为头结点,并且将first.nextWaiter置空
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    //设置ThreadA节点的waitStatus=0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //将ThreadA加入到AQS
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //唤醒ThreadA
        LockSupport.unpark(node.thread);
    return true;
}

总结

await 和 signal 的总结

我把前面的整个分解的图再通过一张整体的结构图来表述,线程awaitThread先通过lock.lock()方法获取锁成功后调用了condition.await方法进入等待队列,而另一个线程signalThread通过lock.lock()方法获取锁成功后调用了 condition.signal或者signalAll方法,使得线程awaitThread能够有机会移入到同步队列中,当其他线程释放lock后使得线程awaitThread能够有机会获取lock,从而使得线程awaitThread能够从 await方法中退出执行后续操作。如果awaitThread获取lock失败会直接进入到同步队列。

阻塞:await()方法中,在线程释放锁资源之后,如果节点不在 AQS 等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁

释放:signal()后,节点会从 condition 队列移动到 AQS等待队列,则进入正常锁获取流程

流程图

本文标签: 详解condition