admin管理员组文章数量:1599541
个人愚见欢迎指正
reentrantLock的分析
测试代码
public class LockTest {
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
new Thread(() -> {
reentrantLock.lock();
},"thread1").start();
System.out.println("=====");
new Thread(() -> {
reentrantLock.lock();
},"thread2").start();
}
}
进入java.util.concurrent.locks.ReentrantLock#lock方法
public void lock() {
sync.lock();
}
sync是ReentrantLock的一个内部类,它继承AbstractQueuedSynchronizer,也就是通常说的aqs抽象队列同步器,这里的sync有一个子类NonfairSync,NonfairSync也是一个内部类
进入java.util.concurrent.locks.ReentrantLock.NonfairSync#lock
static final class NonfairSync extends Sync {
//...
final void lock() {
//乐观锁的方式更新,比较的值为0,如果更新的时候依然等于0,则把它更新为1,表示这把锁被占用了
if (compareAndSetState(0, 1))
//设置当前线程为锁的持有者
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//...
}
进入java.util.concurrent.locks.AbstractQueuedSynchronizer#compareAndSetState
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
再跟就是native方法compareAndSwapInt了
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
进入Unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);//获得cas对象的地址
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;//比较期望值并交换
UNSAFE_END
jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest, jbyte compare_value) {
assert(sizeof(jbyte) == 1, "assumption.");
uintptr_t dest_addr = (uintptr_t)dest;
uintptr_t offset = dest_addr % sizeof(jint);
volatile jint* dest_int = (volatile jint*)(dest_addr - offset);
jint cur = *dest_int; //获得当前值
jbyte* cur_as_bytes = (jbyte*)(&cur);
jint new_val = cur; //声明一个新的值
jbyte* new_val_as_bytes = (jbyte*)(&new_val);
new_val_as_bytes[offset] = exchange_value;
while (cur_as_bytes[offset] == compare_value) { //如果当前值和比较值相等
jint res = cmpxchg(new_val, dest_int, cur);//汇编指令CAS操作,比较值为cur,更新值为new_val
if (res == cur) break;
cur = res;
new_val = cur;
new_val_as_bytes[offset] = exchange_value;
}
return cur_as_bytes[offset];
}
回到java.util.concurrent.locks.ReentrantLock.NonfairSync#lock
static final class NonfairSync extends Sync {
//...
final void lock() {
//如果这里cas更新失败返回false
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//...
}
进入java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
//如果和锁的持有者是同一个线程就重入
if (!tryAcquire(arg) &&
//把自己加入阻塞队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//上面如果是true,表示自己被中断,这里响应中断
selfInterrupt();
}
进入java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获得aqs的state
int c = getState();
//如果state等于0说明这把锁没有被占用
if (c == 0) {
//乐观锁的方式将state设置为1,并将当前线程设置为lock的拥有者
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前线程和lock的拥有者是同一个线程,则重入
else if (current == getExclusiveOwnerThread()) {
//state+1
int nextc = c + acquires;
//超过了int的最大值会变成负数,抛异常
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
进入java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//把当前节点设置为aqs中等待获得锁的链表的最后一个,第一个加入的node没有pred,pred==null
if (pred != null) {
node.prev = pred;
//乐观锁设置链表尾
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
//获得链表尾
Node t = tail;
//第一次加入wait的node时,没有tail和head需要初始化
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
进入java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//获取锁失败后是否需要挂起
if (shouldParkAfterFailedAcquire(p, node) &&
//LockSupport.park(this)挂起当前线程,没获取到锁在这里被挂起
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在reentrantLock.unlock中释放锁的时候,唤醒挂起的线程
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//重入锁lock的时候state+1,unlock的时候需要将state-1,当lock加锁和unlock解锁次数相等的时候,state=0返回true
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
//清除等待的信号
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒下一个线程
if (s != null)
LockSupport.unpark(s.thread);
}
condition的分析
在使用condition的时候await方法会使当前线程放弃持有的锁
测试代码
public class ConditionTest {
public static void main(String[] args) throws InterruptedException {
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
new Thread(() -> {
reentrantLock.lock();
System.out.println("thread1=====");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
reentrantLock.unlock();
System.out.println("thread1=====结束");
},"thread1").start();
Thread.sleep(50);
new Thread(() -> {
reentrantLock.lock();
System.out.println("=====thread2");
condition.signal();
reentrantLock.unlock();
},"thread2").start();
}
}
先看看condition的await方法
进入java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//加入等待Condition的node队列
Node node = addConditionWaiter();
//释放持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//是否在同步队列中
while (!isOnSyncQueue(node)) {
//挂起当前线程,在这里等待condition.sign()
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//这里是被condition.sign唤醒后需要把之前设置为0的state设置成原来的值
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
进入java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果等待condition的最后一个节点是空的,说明当前线程是第一个节点
if (t == null)
firstWaiter = node;
else
//把自己放在最后一个节点
t.nextWaiter = node;
lastWaiter = node;
return node;
}
进入java.util.concurrent.locks.AbstractQueuedSynchronizer#fullyRelease
final int fullyRelease(Node node) {
boolean failed = true;
try {
//获得锁重入的次数
int savedState = getState();
//释放锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
进入java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
//把state设置为0,把当前锁的拥有线程exclusiveOwnerThread设置为null,即释放占有的锁(前提条件当前线程必须是锁的拥有者,比如condition.await()前要先调用lock.lock()等方法占有锁)
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
进入java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//上面讲了,这里用乐观锁设置aqs的state,并且设置lock的拥有线程为当前线程
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
然后来看看condition.wait()
进入java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//拿到等待condition的第一个node
Node first = firstWaiter;
if (first != null)
//唤醒这个node的线程
doSignal(first);
}
进入java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignal
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//唤醒线程
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
进入java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//把这个线程放进获取锁的队列,让他自己去争抢锁(thread2执行reentrantLock.unlock()会unpark thread1 因为这里把thread1重新放入等待获取锁的队列中了)
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
condition.await 会把当前线程的node放入等待condition的链表中,然后释放当前持有的锁,在condition.sign的时候,会从等待condition的链表中取出node,放入争抢锁资源的node链表中,让他们自己去争抢资源
本文标签: 线程是否会conditionawaitReentrantLock
版权声明:本文标题:condition的await是否会释放线程占有的reentrantLock 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dianzi/1728321664a1153903.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论