admin管理员组文章数量:1599900
condition
condition将Object中的通信方法(wait\notify\notifyAll)分解成不同的对象,以便将这些对象与任意Lock组合使用,更加灵活。
condition的强大之处在于他可以为多线程通信之间建立不同的condition,使用原来synchronized/wait()只有一个阻塞队列,这里有一个弊端,因为这种方式在同一个Object的的wait和notify,例如队列已满,所有的生产者现场阻塞,某个时刻消费者消费了一个元素,则现在需要唤醒某个生产者线程,如果这时正好也有消费者线程也在阻塞中,则很可能唤醒的是一个消费者线程。
而使用lock/condition,可以实现多个阻塞队列,signal只会唤起某个condition阻塞队列下的await线程。比如上面这种情况,可以分别给生产者和消费者定义2个condition,那么就可以指定唤醒生产者线程,而不用担心唤醒消费者线程,因为condition不同,xxx.signal只会唤醒xxx.await下面阻塞的线程。
下面写两个案例,第一个案例:我们先给生产者和消费者定义同一个condition。第二个案例是分别生产者和消费者定义2个condition,看看异同。
生产和消费定义同一个condition
生产者
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class ConditionProducer implements Runnable {
private Lock lock;
private Condition condition;
private AtomicInteger count;
public ConditionProducer(Lock lock, Condition condition, AtomicInteger integer) {
this.lock = lock;
this.condition = condition;
this.count = integer;
}
@Override
public void run() {
for (; ; ) {
try {
lock.lock();
while (count.intValue() >= 10) { // 池子小于最大值(这里设置10,自定义)
// 池子满了阻塞
System.out.println("池子满了阻塞,等待消费。。。。。。");
condition.await();
// Thread.sleep(500);
}
count.incrementAndGet();
System.out.println("池子生产了 count=" + count);
condition.signal(); // 唤醒消费者线程
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
消费者
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class ConditionConsumer implements Runnable {
private Lock lock;
private Condition condition;
private AtomicInteger count;
public ConditionConsumer(Lock lock, Condition condition, AtomicInteger integer) {
this.lock = lock;
this.condition = condition;
this.count = integer;
}
@Override
public void run() {
for (; ; ) {
try {
lock.lock();
while (count.intValue() <= 0) { // 池子不为空
// 池子为空 阻塞
System.out.println("池子空了,等待生产count=" + count);
condition.await();
// Thread.sleep(500);
}
System.out.println("开始消费 count=" + count);
count.decrementAndGet();
condition.signal();// 唤醒生产者可以生产
// Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
测试
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.*;
public class Test {
public static void main(String[] args){
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
AtomicInteger integer = new AtomicInteger(0);
ConditionConsumer conditionConsumer = new ConditionConsumer(lock,condition,integer);
ConditionProducer conditionProducer = new ConditionProducer(lock,condition,integer);
// 启动一个消费者
new Thread(conditionConsumer).start();
// 启动一个生产者
new Thread(conditionProducer).start();
}
}
- 这里有个问题,上面只定义了一个消费者和一个生产者, 如果定义多个消费者线程和多个生产者线程,可能出现一个现象,消费者发现了池子空了就await阻塞了,生产者此时正在生产,有些生产者线程发现池子满了,也阻塞了,那么生产者里面的condition.signal(),此时不一定唤醒的是消费者,也能仍然是生产者,因为消费者和生产者线程用的是同一个condition。这种情况就跟wait和notify很相似。
怎么解决呢,就是分别定义两个condition给生产者和消费者用。
改造如下:
生产消费分别定义2个condition
升级版生产者
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
class ConditionProducer2 implements Runnable {
private Lock lock;
// 生产者condition
private Condition putCondition;
// 消费者condition
private Condition takeCondition;
// 假设一个池子
private AtomicInteger count;
public ConditionProducer2(Lock lock, Condition putCondition, Condition takeCondition, AtomicInteger integer) {
this.lock = lock;
this.putCondition = putCondition;
this.takeCondition = takeCondition;
this.count = integer;
}
@Override
public void run() {
for (; ; ) {
try {
lock.lock();
while (count.intValue() >= 10) { // 池子小于最大值(这里设置10,自定义)
// 池子满了阻塞
System.out.println("池子满了阻塞,等待消费。。。。。。");
putCondition.await();
// Thread.sleep(500);
}
count.incrementAndGet();
System.out.println("池子生产了 count=" + count);
takeCondition.signal(); // 唤醒消费者线程
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
升级版消费者
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @author yanming
* @version 1.0.0
**/
public class ConditionConsumer2 implements Runnable {
private Lock lock;
// 生产者condition
private Condition putCondition;
// 消费者condition
private Condition takeCondition;
// 假设一个池子
private AtomicInteger count;
public ConditionConsumer2(Lock lock, Condition putCondition, Condition takeCondition, AtomicInteger integer) {
this.lock = lock;
this.putCondition = putCondition;
this.takeCondition = takeCondition;
this.count = integer;
}
@Override
public void run() {
for (; ; ) {
try {
lock.lock();
while (count.intValue() <= 0) { // 池子不为空
// 池子为空 阻塞
System.out.println("池子空了,等待生产count=" + count);
takeCondition.await();
// Thread.sleep(500);
}
System.out.println("开始消费 count=" + count);
count.decrementAndGet();
putCondition.signal();// 唤醒生产者可以生产
// Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
测试
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Test2 {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition putCondition = lock.newCondition();
Condition takeCondition = lock.newCondition();
AtomicInteger integer = new AtomicInteger(0);
ConditionConsumer2 conditionConsumer2 = new ConditionConsumer2(lock, putCondition, takeCondition, integer);
ConditionProducer2 conditionProducer2 = new ConditionProducer2(lock, putCondition, takeCondition, integer);
new Thread(conditionConsumer2).start();
// new Thread(conditionConsumer2).start();
//
// new Thread(conditionProducer2).start();
new Thread(conditionProducer2).start();
}
}
这样就可以避免线程之间通信错误了。
- 当生产者调用takeCondition.signal()只会唤醒takeCondition.await()下的消费者线程。
- 当消费者调用putCondition .signalAll()只会唤醒putCondition .await()下的生产者线程。
优化
这里还有可以优化的地方,我们发现消费者和生产用的是同一个ReentrantLock,这就导致消费者和生产者不能同时进行操作,其实这里也可以定义2个ReentrantLock。 这也是ArrayBlockingQueue和LinkedBlockingQueue不同的地方。不同地方可以参考这一篇末尾线程池详解
ArrayBlockingQueue为什么没用用双锁呢,之所以没这样去做,猜测是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。
源码中ArrayBlockingQueue定义1个lock,2个condition,这个就类似上面写的案例中的写法
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
LinkedBlockingQueue的定义了2个lock,2个condition
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
版权声明:本文标题:condition实现生产者和消费者 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/xitong/1728321650a1153901.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论