admin管理员组文章数量:1599904
java.lang.Object提供了wait()、notify()/notifyAll(),通过wait()可以使当前线程释放持有的锁,进行等待状态,通过notify()/notifyAll()可以使唤醒当前线程,其可以对锁资源进行竞争。
在JUC提供的锁实现中,通过java.util.concurrent.locks.Conidtion提供了类似实现,可以完成线程的等待和唤醒。
演示示例:
首先,新建两个线程,一个用于演示线程等待,一个用于演示线程唤醒。
package com.securitit.serialize.locks;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class ConditionAwaitThread extends Thread {
// 锁实例.
private Lock lock;
// 条件实例.
private Condition condition;
// 构造方法.
public ConditionAwaitThread(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + ":获得锁.");
System.out.println(Thread.currentThread().getName() + ":线程等待.");
condition.await();
Thread.sleep(5000);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + ":释放锁.");
lock.unlock();
}
}
}
package com.securitit.serialize.locks;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class ConditionSignalThread extends Thread {
// 锁实例.
private Lock lock;
// 条件实例.
private Condition condition;
// 构造方法.
public ConditionSignalThread(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + ":获得锁.");
System.out.println(Thread.currentThread().getName() + ":唤醒线程.");
condition.signal();
Thread.sleep(5000);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + ":释放锁.");
lock.unlock();
}
}
}
然后,新建一个测试类,对等待和唤醒进行测试。
package com.securitit.serialize.locks;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionTester {
private static Lock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) throws Exception {
// 新建多个多线程,用以模拟多线程环境.
new ConditionAwaitThread(lock, condition).start();
new ConditionSignalThread(lock, condition).start();
}
}
输出结果:
Thread-0:获得锁.
Thread-0:线程等待.
Thread-1:获得锁.
Thread-1:唤醒线程.
Thread-1:释放锁.
Thread-0:释放锁.
从输出结果可以看出,线层Thread-0在调用Condition.await()方法时,线程释放锁和CPU时间片,线程Thread-1获得锁和CPU时间片,然后调用Condition.singal()唤醒了线程Thread-0,和Object的wait()、notify()/notifyAll()实现了相似的功能。
需要注意的是,使用Condition功能时,需要保证Lock已被获取且未被释放,如若未在持有锁的时间内使用Condition功能,会抛出java.lang.IllegalMonitorStateException异常。
源码分析:
Condition API:
public interface Condition {
// 线程等待,直至收到唤醒指令.
void await() throws InterruptedException;
// 线程等待,可响应中断.
void awaitUninterruptibly();
// 线程等待,指定超时时间.
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 线程等待,指定超时时间.
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 线程等待,直到指定时间.
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个等待线程.
void signal();
// 唤醒所有等待线程.
void signalAll();
}
Condition 实现:
Condition具体由AQS内部类ConditionObject来实现,通过AQS的同步队列来实现线程的等待和唤醒。
public class ConditionObject implements Condition, java.io.Serializable {
// 序列化版本号.
private static final long serialVersionUID = 1173984872572414699L;
// 第一个等待者.
private transient Node firstWaiter;
// 最后一个等待者.
private transient Node lastWaiter;
// Construct.
public ConditionObject() { }
// 新增等待者.
private Node addConditionWaiter() {
Node t = lastWaiter;
// 若lastWaiter是取消状态,则清除它.
if (t != null && t.waitStatus != Node.CONDITION) {
// 解除所有取消的等待者.
unlinkCancelledWaiters();
t = lastWaiter;
}
// 当前线程新建节点.
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 若null == lastWaiter,则新建节点为firstWaiter.
// 若null != lastWaiter,则新建节点为lastWaiter的nextWaiter.
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 唤醒指定节点代表的线程.
private void doSignal(Node first) {
do {
// 若指定节点无下一节点,则lastWaiter=null.
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 唤醒指定节点代表的所有线程.
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
// 解除所有取消的等待者.
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
// firstWaiter != null.
while (t != null) {
// firstWaiter.nextWaiter.
Node next = t.nextWaiter;
// firstWaiter非条件节点.
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
// 唤醒一个线程.
public final void signal() {
// 是否独占.
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 唤醒所有线程.
public final void signalAll() {
// 是否独占.
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 线程等待.
public final void awaitUninterruptibly() {
// 新增条件等待者.
Node node = addConditionWaiter();
// 全部释放.
int savedState = fullyRelease(node);
boolean interrupted = false;
// 是否在同步队列中.
while (!isOnSyncQueue(node)) {
// 线程等待.
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
// 线程等待.
public final void await() throws InterruptedException {
// 线程中断.
if (Thread.interrupted())
throw new InterruptedException();
// 新增条件等待者.
Node node = addConditionWaiter();
// 全部释放.
int savedState = fullyRelease(node);
int interruptMode = 0;
// 是否在同步队列中.
while (!isOnSyncQueue(node)) {
// 线程等待.
LockSupport.park(this);
// 检查等待者的中断状态.
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
// 解除所有取消的等待者.
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 线程等待.
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
// 线程中断.
if (Thread.interrupted())
throw new InterruptedException();
// 新增条件等待者.
Node node = addConditionWaiter();
// 全部释放.
int savedState = fullyRelease(node);
// 计算死亡时间.
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
// 是否在同步队列中.
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
// 线程等待.
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 检查等待者的中断状态.
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
// 解除所有取消的等待者.
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
// 线程等待.
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
// 获取死亡时间.
long abstime = deadline.getTime();
// 线程中断.
if (Thread.interrupted())
throw new InterruptedException();
// 新增条件等待者.
Node node = addConditionWaiter();
// 全部释放.
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
// 是否在同步队列中.
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
// 线程等待.
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
// 解除所有取消的等待者.
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// 线程等待.
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
// 计算纳秒超时时间.
long nanosTimeout = unit.toNanos(time);
// 线程中断,抛出中断异常.
if (Thread.interrupted())
throw new InterruptedException();
// 新增条件等待者.
Node node = addConditionWaiter();
// 全部释放.
int savedState = fullyRelease(node);
// 计算死亡时间.
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
// 是否在同步队列中.
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
// 线程暂停.
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 纳秒超时时间.
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
// 解除所有取消的等待者.
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// 是否AQS实现.
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
// 是否有等待者.
protected final boolean hasWaiters() {
// 是否被独占.
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 递归链表查找是否有等待者.
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
// 获取等待者队列长度.
protected final int getWaitQueueLength() {
// 是否被独占.
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
// 递归链表取得长度.
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
// 获取等待线程集合.
protected final Collection<Thread> getWaitingThreads() {
// 是否被独占.
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
// 递归链表取得所有线程.
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
Condition依赖AQS的ConditionObject内部类来完成线程的等待、唤醒,可以看出ConditionObject实际依赖的LockSupport来完成针对线程的park和unpark,使得可以达到与Object的wait()、notify()/notifyAll()相似的功能。
注:文中源码均来自于JDK1.8版本,不同版本间可能存在差异。
如果有哪里有不明白或不清楚的内容,欢迎留言哦!
版权声明:本文标题:Condition 深入源码解析 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dongtai/1728322561a1154016.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论