admin管理员组

文章数量:1599533

文章目录

  • 1. Condition作用
    • 1.1 什么是线程通信?
    • 1.2 Condition接口常用方法
  • 2. Condition实战例子
  • 3. 原理分析
    • 3.1 Condition
    • 3.2 await()方法
      • 3.2.1 addConditionWaiter()
      • 3.2.2 fullyRelease()
    • 3.3 signal方法

1. Condition作用

Condition的作用是负责线程通信。

在使用Lock之前,我们使用的最多的同步方式应该是synchronized关键字来实现同步方式了。配合Object的wait()、notify()系列方法可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。Object和Condition接口的一些对比。摘自《Java并发编程的艺术》

其实Condition就是实现线程协作,让线程进入休眠(放弃锁)状态和唤醒(获得锁)状态

注意:要想理解Condition原理,需要先了解AQS,不了解AQS的可以看先之前的文章:

  • 《synchronized 和reentrantlock的优缺点》 介绍了2种锁之间的联系和区别,方便我们理解,因为只有熟悉了synchronized,才能更好的理解reentrantlock。

  • 《AQS中的底层实现和源码分析(ReentrantLock为例)》介绍了AQS的加锁原理。

1.1 什么是线程通信?

我们知道Synchronized是负责加锁的,一旦持有锁,就会产生互斥,别的线程就只好阻塞等待锁的释放,在典型的生产者消费者场景下,假设生产者A持有锁,负责往队列中插入数据,消费者B需要等待A释放锁,才能从队列中取数据。

如果队列是有界的,那么如果生产者A在入队操作发现队列已经满了的话,会wait,释放锁资源,这时消费者B会获取锁,然后进行出队操作,同时试图唤醒生产A,让其继续产生入队操作。

如果队列为空后,B也需要wait,释放锁资源,唤醒A。

典型的生产者消费者问题,如果没有线程协作,那么会导致死锁问题。

这样的流程就是线程的通信,负责协作。假设没有协作的话,假设A持有锁,发现队列满了,那么只好释放锁,但是释放锁后,A可能又会抢占锁,而线程B无法抢到锁,进行出队操作,这样效率就很低。

对比wait让持有锁的线程A释放cpu资源,类似休眠状态,不会再去争抢没有意义的锁,等到B的唤醒,这时队列有空余空间了,A再重新去获取锁,这才是有意义的去抢锁

Synchronized锁的协作过程,可以参见 《线程并发协作(生产者/消费者模式)》

在Synchronized加锁状态时,是使用wait/notify/notifyAll进行线程间的通信。那么在使用ReentrantLock加锁时,是如何实现线程间通信问题的呢? 在JUC中既然提供了Lock,也提供了用作其线程间通信的方式,尅使用Condition来休眠和唤醒线程。

1.2 Condition接口常用方法

  • condition可以通俗的理解为条件队列。当一个线程在调用了await方法以后,直到线程等待的某个条件为真的时候才会被唤醒。这种方式为线程提供了更加简单的等待/通知模式。Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。
  • await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
  • await(long time, TimeUnit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
  • awaitNanos(long nanosTimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
  • awaitUninterruptibly() :造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
  • awaitUntil(Date deadline) :造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。
  • signal() :唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
  • signalAll() :唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。

2. Condition实战例子

唤醒线程:

package com.test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class ConditionNotify implements Runnable {

    private Lock lock;

    private Condition condition;

    public ConditionNotify(Lock lock, Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        try {
            //加锁
            lock.lock();

            System.out.println("begin - notify");
            //唤醒
            condition.signal();
            //间隔2s
            TimeUnit.SECONDS.sleep(2);

            System.out.println("end - notify");

        }catch (Exception e){

        }finally {
            //释放锁
            lock.unlock();
        }
    }

}

休眠阻塞线程:

package com.test;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class ConditionWait implements Runnable {

    private Lock lock;

    private Condition condition;

    public ConditionWait(Lock lock, Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        try {
            //加锁
            lock.lock();

            System.out.println("begin - wait");
             //间隔2s
            TimeUnit.SECONDS.sleep(2);
            //休眠
            condition.await();
            System.out.println("end - wait");

        }catch (InterruptedException e){

        }finally {
            lock.unlock();
        }
    }
}

运行类:

package com.test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionMain {
    public static void main(String[] args) throws Exception{
        //创建锁
        Lock lock = new ReentrantLock();
        //创建Condition
        Condition condition = lock.newCondition();

        // 保证两个线程获取的是同一把锁 和 同一个COndition

        new Thread(new ConditionWait(lock, condition)).start();
       
        new Thread(new ConditionNotify(lock, condition)).start();
    }
}


运行结果:

begin - wait
begin - notify
end - notify
end - wait

3. 原理分析

在学习AQS的时候,知道存在CLH同步等待队列,用于lock加锁时,未拿到锁的线程会加入该队列,并等待依次获取锁。

其实还存在一个队列,就是Condition条件队列,它负责存放调用wait的线程。

假如两个线程A、B同时争抢同一把锁,线程A先获取到锁的时候,线程B出入等待队列中,队列状态如下:

3.1 Condition

final ConditionObject newCondition() {
          return new ConditionObject();
  }

Condition对象使用执行lock.newCondition()获取的,会创建一个ConditionObject对象返回。我们先来看一下ConditionObject的结构:

private transient Node firstWaiter;
      /** Last node of condition queue. */
      private transient Node lastWaiter;

它有两个成员对象(很重要),分别表示头结点和尾节点构成一个单向队列,当调用await方法时,会将线程加入的这个队列中,下文会讲。

注意:条件队列时单向链表!

3.2 await()方法

线程A获取到锁处于运行状态,当调用Condition的await方法时,具体做了哪些事情:

	public final void await() throws InterruptedException {
	    if (Thread.interrupted())
	        throw new InterruptedException();
	    // 将线程加入到Condition队列中
	    Node node = addConditionWaiter();
	    // 释放锁
	    int savedState = fullyRelease(node);
	    int interruptMode = 0;
	    while (!isOnSyncQueue(node)) {
	        LockSupport.park(this);
	        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
	            break;
	    }
	    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
	        interruptMode = REINTERRUPT;
	    if (node.nextWaiter != null) // clean up if cancelled
	        unlinkCancelledWaiters();
	    if (interruptMode != 0)
	        reportInterruptAfterWait(interruptMode);
	}

await方法调用addConditionWaiter方法,将线程加入到Condition队列中去,然后调用在await方法中调用fullyRelease释放锁,后续走AQS逻辑会从AQS队列中唤醒等待线程B,这个时候线程B获取到锁。

3.2.1 addConditionWaiter()

addConditionWaiter方法,将线程加入到Condition队列中去:

	private Node addConditionWaiter() {
           Node t = lastWaiter;
           // If lastWaiter is cancelled, clean out.
           if (t != null && t.waitStatus != Node.CONDITION) {
               unlinkCancelledWaiters();
               t = lastWaiter;
           }
           Node node = new Node(Thread.currentThread(), Node.CONDITION);
           if (t == null)
               firstWaiter = node;
           else
               t.nextWaiter = node;
           lastWaiter = node;
           return node;
       }

当前线程ThreadA加入条件队列,如下图:

3.2.2 fullyRelease()

线程A调用fullyRelease释放锁:

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

线程A释放锁后,线程B获得锁,线程A也就从AQS队列中删除了:

3.3 signal方法

当线程B运行时,会调用signal方法,signal方法逻辑如下:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        // 将Condition队列中的线程移除,加入到
        doSignal(first);
}

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
     // 加入到AQS队列
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 如果没有线程占用锁,就唤醒该线程
        LockSupport.unpark(node.thread);
    return true;
}

signal方法执行后,将Condition队列中的线程加入到AQS队列,(如果这个时候线程B运行完释放了锁就会唤醒线程B,)队列状态如下:

直到两个线程都运行完成,两个队列都会为空。


参考:
《JUC-Condition使用以及Condition原理分析》
JUC常见工具类使用及原理分析–Condition

本文标签: 线程原理condition