admin管理员组文章数量:1530018
JUC
B栈视频:https://www.bilibili/video/BV1ar4y1x727/?p=71&spm_id_from=pageDriver&vd_source=cbfd569af4d6a422b4da91f9f313c750
博客参考:https://blog.xueqimiao/juc/3b6916/#_4%E3%80%81volatile%E7%89%B9%E6%80%A7
一、线程基础
进程:是程序的一次执行,是系统进行资源分配和调度的独立单位,每一个进程都有它自己的内存空间和系统资源。
进程具有的特征:
- 动态性:进程是程序的一次执行过程,是临时的,有生命期的,是动态产生,动态消亡的
- 并发性:任何进程都可以同其他进行一起并发执行
- 独立性:进程是系统进行资源分配和调度的一个独立单位
- 结构性:进程由程序,数据和进程控制块三部分组成
线程:在同一个进程内又可以执行多个任务,而这每一个任务我们就可以看作是一个线程,一个进程会有1至多个线程。
线程是轻量级的进程,是程序执行的最小单元,使用多线程而不是多进程去进行并发程序的设计,是因为线程间的切换和调度的成本远远小于进程。
管程:Monitor
(监视器),也就是我们平时所说的锁
Monitor
其实是一种同步机制,他的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码。
JVM
中同步是基于进入和退出监视器对象(Monitor
,管程对象)来实现的,每个对象实例都会有一个Monitor
对象,Monitor
对象会和Java
对象一同创建并销毁,它底层是由C++
语言来实现的。
Object o = new Object();
new Thread(() -> {
synchronized (o) {
}
}, "t1").start();
线程的状态
New
:表示刚刚创建的线程,这种线程还没有开始执行RUNNABLE
:运行状态,线程的start()
方法调用后,线程会处于这种状态BLOCKED
:阻塞状态。当线程在执行的过程中遇到了synchronized
同步块,但这个同步块被其他线程已获取还未释放时,当前线程将进入阻塞状态,会暂停执行,直到获取到锁。当线程获取到锁之后,又会进入到运行状态(RUNNABLE
)WAITING
:等待状态。和**TIME_WAITING
**都表示等待状态,区别是WAITING
会进入一个无时间限制的等,而TIME_WAITING
会进入一个有限的时间等待,那么等待的线程究竟在等什么呢?一般来说,WAITING
的线程正式在等待一些特殊的事件,比如,通过wait()
方法等待的线程在等待notify()
方法,而通过join()
方法等待的线程则会等待目标线程的终止。一旦等到期望的事件,线程就会再次进入RUNNABLE
运行状态。TERMINATED
:表示结束状态,线程执行完毕之后进入结束状态。
注意:从NEW
状态出发后,线程不能在回到NEW
状态,同理,处理TERMINATED
状态的线程也不能在回到RUNNABLE
状态
wait/sleep
的区别?
功能都是当前线程暂停,有什么区别?
wait
释放锁sleep
不释放锁
终止线程
一般来说线程执行完毕就会结束,无需手动关闭。但是如果我们想关闭一个正在运行的线程,有什么方法呢?可以看一下Thread
类中提供了一个stop()
方法,调用这个方法,就可以立即将一个线程终止,非常方便。
final Thread t1 = new Thread(() -> {
System.out.println("======begin===========");
boolean flag = true;
while (flag) {
}
System.out.println("======end===========");
}, "t1");
t1.start();
// 睡一秒
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 停止线程,不建议使用了,强制关闭,会造成意想不到的后果,所以建议不使用
t1.stop();
System.out.println("线程t1的状态为 => "+t1.getState());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程t1的状态为 => "+t1.getState());
输出结果
======begin===========
线程t1的状态为 => RUNNABLE
线程t1的状态为 => TERMINATED
线程中断
public void interrupt() // 中断线程,设置中断标志位true
public boolean isInterrupted() // 判断线程是否被中断 通过检查中断标志来判断
public static boolean interrupted() // 判断线程是否被中断,并清除当前中断状态
线程中断并不会使线程立即退出,而是给线程发送一个通知,告知目标线程,有人希望你退出了!至于目标线程接收到通知之后如何处理,则完全由目标线程自己决定,这点很重要,如果中断后,线程立即无条件退出,那不就和stop
一样了嘛!
Thread t1 = new Thread(){
@Override
public void run() {
while (true) {
if (this.isInterrupted()) {
System.out.println("中断");
break;
}
}
}
};
t1.start();
// 睡一秒
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
t1.interrupt();
另一种中断方法
通过变量来控制线程是否停止,线程在睡眠的过程中,是无法通过判断变量的改变来结束的。所以此时只能使用interrupt
方法来中断线程了。
final Thread t1 = new Thread() {
@Override
public void run() {
while (FLAG) {
try {
// 睡 200秒
Thread.sleep(200000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 睡眠结束自动退出
System.out.println("退出了");
break;
}
}
};
t1.setName("t1");
t1.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
FLAG = false;
但使用interrupt
方法需要注意的是:sleep
方法由于中断而抛出异常之后,线程的中断标志会被清除(置为false
),所以在异常中需要执行this.interrupt()
方法,将中断标志位置为true
;
final Thread t1 = new Thread() {
@Override
public void run() {
while (true) {
try {
// 睡 200秒
Thread.sleep(200000);
} catch (InterruptedException e) {
this.interrupt();
e.printStackTrace();
}
if (this.isInterrupted()) {
System.out.println("退出了");
break;
}
}
}
};
t1.setName("t1");
t1.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
t1.interrupt();
等待wait
和通知notify
等待方法和通知方法,是支持多线程之间的协作而产生方法,这两个方法并不是
Thread
类中的,而是定义在Object
类中的。这意味着所有的对象都可以调用这两个方法。
public final void wait() throws InterruptedException;
public final native void notify();
**当一个对象实例上调用wait()
方法之后,当前线程就会被阻塞。**解释一下,就是当在线程A中调用了,object.wait()
,那么线程A就会停止继续执行,转为等待状态,线程A会持续等待,一直等待到其他线程调用了object.notify()
方法为止,这时object
就成为了多个线程之间有效的通信手段。
注意:如果有多个线程同时等待object.notify()
,此时会随机唤醒其中一个,这个选择是随机的,不公平的。
除notify()
方法外,Object
对象还有一个nofiyAll()
方法,它和notify()
方法的功能类似,不同的是,它会唤醒在这个等待队列中所有等待的线程,而不是随机选择一个。
wait
方法的使用规则:Object.wait()
方法并不能随便调用。它必须包含在对应的synchronize
语句汇总,无论是wait()
方法或者notify()
方法都需要首先获取目标对象的一个监视器。因为wait
方法会释放锁,所以不会产生死锁。
public static Object o = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (o) {
System.out.println(System.currentTimeMillis() + " 当前线程为 " + Thread.currentThread().getName() + " start");
try {
System.out.println(System.currentTimeMillis() + " 当前线程为 " + Thread.currentThread().getName() + " 在等待");
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + " 当前线程为 " + Thread.currentThread().getName() + " end");
}
}, "t1").start();
new Thread(() -> {
synchronized (o) {
System.out.println(System.currentTimeMillis() + " 当前线程为 " + Thread.currentThread().getName() + " start");
o.notify();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + " 当前线程为 " + Thread.currentThread().getName() + " end");
}
}, "t2").start();
}
输出结果:可以看到即使线程t2
调用了notify
方法,此时t1
也没有去执行,而是在t2
线程执行完毕,释放了o
的锁,t1
获得了这个锁之后,才继续运行。sleep
是不会释放锁的,这也就阐述了关于wait
和sleep
的区别。
1665408621232 当前线程为 t1 start
1665408621232 当前线程为 t1 在等待
1665408621232 当前线程为 t2 start
1665408623240 当前线程为 t2 end
1665408623240 当前线程为 t1 end
等待线程结束join
很多时候,一个线程的输入可能非常依赖于另外一个或者多个线程的输出,此时,这个线程就需要等待依赖的线程执行完毕,才能继续执行。jdk
提供了join()
操作来实现这个功能
public final void join() throws InterruptedException; // 当前线程一直等待,直到被等待线程结束。
public final synchronized void join(long millis) throws InterruptedException; // 当前线程等待多少时间,超过指定时间,会停止等待,继续执行
public static void main(String[] args) {
final Thread t1 = new Thread(() -> {
synchronized (o) {
System.out.println("当前线程为 " + Thread.currentThread().getName() + " start");
try {
System.out.println("当前线程为 " + Thread.currentThread().getName() + " 在睡觉");
// 睡他个10秒钟
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + " 当前线程为 " + Thread.currentThread().getName() + " end");
}
}, "t1");
t1.start();
try {
t1.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 被阻塞的是主线程,而不是调用者,注意 注意 注意!重要的事情说三遍
System.out.println("主线程 结束了");
}
而带参的join
方法的源码如下:使用了synchronized
,且等待完毕之后会自动调用notify
方法
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
// 参数判断
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
// 等待0秒,也就是让当前线程停一下,立即唤醒
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
// 判断线程是否或者
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
// 等待delay时间
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
谦让yeild
yield
是谦让的意思,这是一个静态方法,一旦执行,它会让当前线程出让CPU
,但需要注意的是,出让CPU
并不是说不让当前线程执行了,当前线程在出让CPU
后,还会进行CPU
资源的争夺,但是能否再抢到CPU
的执行权就不一定了。
用户线程和守护线程
Java
线程分为用户线程和守护线程,线程的daemon
属性为true
表示是守护线程,false
表示是用户线程守护线程:是一种特殊的线程,在后台默默地完成一些系统性的服务,比如垃圾回收线程
用户线程:是系统的工作线程,它会完成这个程序需要完成的业务操作
注意:设置线程的
daemon
属性一定在start之前设置,不然会抛出IllegalThreadStateException
异常
获取线程的方法
- 继承
Thread
类,重写run
方法 - 实现
Runnable
接口,重写run
方法 - 实现
Callable<T>
接口,重写call
方法 - 线程池中获取
着重介绍下3和4
实现Callable<T>
接口,重写call
方法
public class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return 100;
}
}
callable
接口与runnable
接口的区别?
- 是否有返回值,
callable
有返回值,runnable
没有 callable
方法抛出异常,runnable
不抛出异常- 实现的方法不一样,注意
callable
是实现了call
方法,runnable
实现run
方法
线程池获取ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize
:核心线程大小,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使有其他空闲线程可以处理任务也会创新线程,等到工作的线程数大于核心线程数时就不会在创建了。如果调用了线程池的prestartAllCoreThreads
方法,线程池会提前把核心线程都创造好,并启动maximumPoolSize
:线程池允许创建的最大线程数,此值必须大于等于1。如果队列满了,并且以创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果我们使用了无界队列,那么所有的任务会加入队列,这个参数就没有什么效果了keepAliveTime
:多余的空闲线程的存活时间,当前池中线程数量超过corePoolSize
时,当空闲时间,达到keepAliveTime
时,多余线程会被销毁直到只剩下corePoolSize
个线程为止,如果任务很多,并且每个任务的执行时间比较短,避免线程重复创建和回收,可以调大这个时间,提高线程的利用率unit
:keepAliveTIme
的时间单位,可以选择的单位有天、小时、分钟、毫秒、微妙、千分之一毫秒和纳秒。类型是一个枚举java.util.concurrent.TimeUnit
,这个枚举也经常使用workQueue
:任务队列,被提交但尚未被执行的任务,用于缓存待处理任务的阻塞队列threadFactory
:表示生成线程池中工作线程的线程工厂,用于创建线程,一般默认的即可,可以通过线程工厂给每个创建出来的线程设置更有意义的名字handler
:拒绝策略,表示当队列满了,并且工作线程大于等于线程池的最大线程数(maximumPoolSize
)时如何来拒绝请求执行的runnable
的策略
线程池的拒绝策略
等待队列已经排满了,再也塞不下新任务了,同时,线程池中的
max
线程也达到了,无法继续为新任务服务。这个是时候我们就需要拒绝策略机制合理的处理这个问题。
AbortPolicy
(默认):直接抛出RejectedExecutionException
异常阻止系统正常运行
CallerRunsPolicy
:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
DiscardOldestPolicy
:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。
DiscardPolicy
:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。
参数的合理配置
ThreadPoolExecutor executor = new ThreadPoolExecutor(1,
1,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
Executors.defaultThreadFactory(),
(r, executors) -> {
//自定义拒绝策略
//记录一下无法处理的任务
System.out.println("无法处理的任务:" + r.toString());
});
要想合理的配置线程池,需要先分析任务的特性,可以从以下几个角度分析:
- 任务的性质:
CPU
密集型任务、IO
密集型任务和混合型任务 - 任务的优先级:高、中、低
- 任务的执行时间:长、中、短
- 任务的依赖性:是否依赖其他的系统资源,如数据库连接。
性质不同任务可以用不同规模的线程池分开处理。CPU
密集型任务应 该尽可能小的线程,如配置cpu
数量+1个线程的线程池。由于IO密集型任务并不是一直在执行任务,不能让cpu
闲着,则应配置尽可能多的线程,如:cup
数量*2。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这2个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。可以通过Runtime.getRuntime().availableProcessors()
方法获取cpu
数量。优先级不同任务可以对线程池采用优先级队列来处理,让优先级高的先执行。
使用队列的时候建议使用有界队列,有界队列增加了系统的稳定性,如果采用无解队列,任务太多的时候可能导致系统OOM
,直接让系统宕机。
线程池汇总线程大小对系统的性能有一定的影响,我们的目标是希望系统能够发挥最好的性能,过多或者过小的线程数量无法有消息的使用机器的性能。在Java Concurrency inPractice
书中给出了估算线程池大小的公式:
Ncpu = CUP的数量
Ucpu = 目标CPU的使用率,0<=Ucpu<=1
W/C = 等待时间与计算时间的比例
为保存处理器达到期望的使用率,最有的线程池的大小等于:
Nthreads = Ncpu × Ucpu × (1+W/C)
二、CompletableFuture
Future
和Callable
接口
Future
接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
Callable
接口中定义了需要有返回的任务需要实现的方法,比如在主线程开启一个子线程去执行别的任务,而主线程继续去做其他事情
此时,如果我们想要启动多个异步线程,同时每个线程都要有返回结果,因为我们的某个子线程需要依赖其他线程的结果,这时候就需要找到一个实现了Future
和Callable
的类,这个类就是FutureTask
FutureTask
的两种构造函数
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
案例1
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println("in ---------");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "yes";
});
Thread t1 = new Thread(futureTask);
t1.start();
// 根据输出结果,可以看到get方法是阻塞住了,需要等待t1完成,才能继续执行下去
System.out.println(Thread.currentThread().getName() + "\t" + futureTask.get());
// 设置时间,过了这个时间,自动抛出异常
System.out.println(Thread.currentThread().getName() + "\t" + futureTask.get(1L, TimeUnit.SECONDS));
System.out.println(Thread.currentThread().getName() + "运行");
结论:get
方法会阻塞当前线程,等待调用get
方法的线程完成之后,返回结果在继续。所以一般使用get
方法需要放在最后面。
案例2
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println("in ---------");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "yes";
});
Thread t1 = new Thread(futureTask);
t1.start();
System.out.println(Thread.currentThread().getName() + "运行");
// 用于阻塞式获取结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果
while (true) {
if (futureTask.isDone()) { // 判断是否完成的方法
System.out.println(futureTask.get());
break;
}
}
结论:用于阻塞式获取结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果,但这种方式很消耗资源,不推荐使用
CompletableFuture
类说明
FutureTask
阻塞的方式获取结果的方法并不能满足我们想要的,在java8
中提供了一个类,它有非常强大的Future
拓展功能。
首先是创建这个类的核心静态方法
不推荐使用
new
方式构造这个类,而是通过下面的静态方法构建
runAsync
无 返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
supplyAsync
有 返回值
executor
参数为线程池,如果不指定对应的线程池,则使用ForkJoinPoolmonPool()
默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
Code
无 返回值,泛型中的参数为Void
,注意方法是不同的,这里使用的是runAsync
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "----- in -----");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("----- over ------");
});
System.out.println(future.get());
Code
有 返回值,泛型中的参数为指定类型,以Integer
为例,注意方法是不同的,这里使用的是supplyAsync
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "----- in -----");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("----- over ------");
return 1;
});
System.out.println(future.get());
CompletableFuture
当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
解释,根据调用链来看,
whenComplete
是当完成时,其中的参数是结果和异常,exceptionally
是在出现异常的时候,e
为异常的对象
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "----- in -----");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = ThreadLocalRandom.current().nextInt(10);
System.out.println("----- over ------");
if (result > 2) {
throw new RuntimeException("异常 ");
}
return 1;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("result" + v);
}
}).exceptionally(e -> {
System.out.println("异常" + e.getMessage());
return 123;
});
System.out.println(future.get());
其中CompletableFuture
获取返回结果的方法有两个:join
和get
。他们的区别就是get
会抛出异常,join
不需要。
CompletableFuture
的优点
- 异步任务结束时,会自动回调某个对象的方法;
- 异步任务出错时,会自动回调某个对象的方法;
- 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
经典案例
串行完成任务和并行完成任务,通过多线程并行的方式缩短任务时间,提高效率
代码
public class Demo02 {
// 模拟不同任务
static List<Task> taskList = Arrays.asList(
new Task("读书", "看JUC编程"),
new Task("烧水", "放电水壶烧"),
new Task("吃东西", "炫")
);
public static void main(String[] args) {
long start = System.currentTimeMillis();
// ontByOne(); 6070
together(); // 2066
long end = System.currentTimeMillis();
System.out.println(end - start);
}
private static List<String> ontByOne() {
return taskList.stream()
.map(task -> {
// 模拟一个任务 花费2秒钟
try {
TimeUnit.SECONDS.sleep(2L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return task.getTaskName();
})
.collect(Collectors.toList());
}
private static List<String> together() {
return taskList.stream()
.map(task -> CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
// 模拟一个任务 花费2秒钟
try {
TimeUnit.SECONDS.sleep(2L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return task.getTaskName();
}
)).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());
}
}
class Task {
private String taskName;
private String taskInfo;
public Task() {
}
public Task(String taskName, String taskInfo) {
this.taskName = taskName;
this.taskInfo = taskInfo;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public String getTaskInfo() {
return taskInfo;
}
public void setTaskInfo(String taskInfo) {
this.taskInfo = taskInfo;
}
}
CompletableFuture
常用方法
获得结果和触发计算
// 等待线程完成
public T get()
// 等待时间,超时直接异常
public T get(long timeout, TimeUnit unit)
// 没有计算完成的情况下,返回valueIfAbsent
// 立即获取结果不阻塞 计算完,返回计算完成后的结果 没算完,返回设定的valueIfAbsent值
public T getNow(T valueIfAbsent)
// 获取,和get相同,但不抛出异常
public T join()
代码
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 123;
});
// 线程1秒执行完,但这里阻塞了2秒,线程已经执行完毕
// 可以将阻塞时间注释,即可看到getNow的效果
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(completableFuture.getNow(444));
主动触发计算
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 533;
});
// 用来阻塞,让线程有充分的时间完成,注释掉可以查看到complete方法的效果
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 当get方法被阻塞时,complete会立即打断get方法,并返回value 444
System.out.println(completableFuture.complete(444) + "\t" + completableFuture.get());
对计算结果进行处理
串行化处理的方法包括thenApply
接下来怎么做,whenCompleteAsync
完成时,exceptionally
异常时。
//当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("步骤1");
return 1;
}).thenApply(f -> {
System.out.println("步骤2");
return f + 1;
}).thenApply(f -> {
//int age = 10/0; // 异常情况:那步出错就停在那步。
System.out.println("步骤3");
return f + 2;
}).whenCompleteAsync((v, e) -> {
System.out.println("最终结果: " + v);
}).exceptionally(e -> {
System.out.println("异常,返回null");
return null;
});
System.out.println("-----主线程结束-----");
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
有异常的情况,可以使用handle
有异常也可以往下一步走,根据带的异常参数可以进一步处理
//当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,
// 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("步骤1");
return 1;
}).handle((f, e) -> {
int age = 10 / 0; // 异常
System.out.println("步骤2");
return f + 1;
}).handle((f, e) -> {
System.out.println("步骤3");
return f + 2;
}).whenCompleteAsync((v, e) -> {
System.out.println("最终结果: " + v);
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
System.out.println("-----主线程结束-----");
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
对计算结果进行消费
接收任务的处理结果,并消费处理,无返回结果
// 任务 A 执行完执行 B,并且 B 不需要 A 的结果
thenRun(Runnable runnable)
// 任务 A 执行完执行 B,B 需要 A 的结果,但是任务 B 无返回值
thenAccept(Consumer action)
// 任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值
thenApply(Function fn)
演示
CompletableFuture.supplyAsync(() -> {
return 1;
}).thenApply(f -> {
return f + 1;
}).thenApply(f -> {
return f + 2;
}).thenApply(f -> {
return f + 3;
}).thenAccept(System.out::println);
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
对计算速度进行选用
applyToEither
:谁快用谁
CompletableFuture<Integer> t1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "----- in ----- ");
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
CompletableFuture<Integer> t2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "----- in ----- ");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
});
CompletableFuture<Integer> result = t1.applyToEither(t2, f -> {
System.out.println(Thread.currentThread().getName() + "\t" + "----- in ----- ");
return f + 1;
});
System.out.println(Thread.currentThread().getName() + "\t" + result.get());
对计算结果进行合并
两个CompletionStage
任务都完成后,最终能把两个任务的结果一起交给thenCombine
来处理,先完成的先等着,等待其它分支任务
CompletableFuture<Integer> t1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "----- in ----- ");
return 1;
});
CompletableFuture<Integer> t2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "----- in ----- ");
return 2;
});
CompletableFuture<Integer> result = t1.thenCombine(t2, (x, y) -> {
System.out.println(Thread.currentThread().getName() + "\t" + "----- in ----- ");
return x + y;
});
System.out.println(result.get());
三、锁
乐观锁和悲观锁
悲观锁认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改。
悲观锁的实现方式
synchronized
关键字Lock
的实现类都是悲观锁
适合写操作多的场景,先加锁可以保证写操作时数据正确。显示的锁定之后再操作同步资源。
乐观锁认为自己在使用数据时不会有别的线程修改数据,所以不会添加锁,只是在更新数据的时候去判断之前有没有别的线程更新了这个数据。如果这个数据没有被更新,当前线程将自己修改的数据成功写入。如果数据已经被其他线程更新,则根据不同的实现方式执行不同的操作
乐观锁的实现方式
版本号机制Version
。(只要有人提交了就会修改版本号,可以解决ABA
问题)
ABA
问题:再CAS
中想读取一个值A
,想把值A
变为C
,不能保证读取时的A
就是赋值时的A
,中间可能有个线程将A
变为B
再变为A
。
解决方法:Juc
包提供了一个AtomicStampedReference
,原子更新带有版本号的引用类型,通过控制版本值的变化来解决ABA
问题。
最常采用的是CAS
算法,Java
原子类中的递增操作就通过CAS
自旋实现的。
适合读操作多的场景,不加锁的性能特点能够使其操作的性能大幅提升。
关于锁的8中情况
线程 操作 资源类
方法类:这里为了方便区分静态方法,我分别写了静态和普通方法
class Phone {
public synchronized void sendEmail() {
try {
TimeUnit.SECONDS.sleep(3L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发送邮件");
}
public synchronized void sendSMS() {
System.out.println("发送短信");
}
public static synchronized void sendStaticEmail() {
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发送邮件");
}
public synchronized void sendStaticSMS() {
System.out.println("发送短信");
}
}
1)标准访问有ab
两个线程,请问先打印邮件还是短信? 答:邮件
在同一时刻,只有一个线程能抢到
synchronized
的资源。
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.sendEmail();
}, "a").start();
// 暂停一下,保证a线程先启动
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
phone.sendSMS();
}, "b").start();
}
a
里面故意停3秒?邮件
a
抢到了锁资源,没有释放,所以b
只能干巴巴等着
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.sendEmail();
}, "a").start();
// 暂停一下,保证a线程先启动
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
phone.sendSMS();
}, "b").start();
}
添加一个普通的hello
方法,请问先打印邮件还是hello
?hello
普通方法不抢夺资源,所以不影响
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.sendEmail();
}, "a").start();
new Thread(()->{
phone.hello();
}, "b").start();
}
有两部手机,请问先打印邮件(这里有个3秒延迟)还是短信? 短信
锁在两个不同的对象/两个不同的资源上,不产生竞争条件
public static void main(String[] args) {
Phone phone = new Phone();
Phone phone2 = new Phone();
new Thread(()->{
phone.sendEmail();
}, "a").start();
// 暂停一下,保证a线程先启动
//try {
// TimeUnit.SECONDS.sleep(1L);
//} catch (InterruptedException e) {
// e.printStackTrace();
//}
new Thread(()->{
phone2.sendSMS();
}, "b").start();
}
有两个静态同步方法(synchroized
前加static
,3秒延迟也在),先打印邮件还是短信?邮件
争夺的是类锁,其实有无手机实例都行,抢夺的都是类锁,属于类资源
public static void main(String[] args) {
new Thread(()->{
Phone.sendStaticEmail();
}, "a").start();
new Thread(()->{
Phone.sendStaticSMS();
}, "b").start();
}
两个手机,有两个静态同步方法(synchroized
前加static
,3秒延迟也在),有1部手机,先打印邮件还是短信?邮件
其实这个意义不大,因为这种方式和上面是一样的,虽然通过实例可以调用静态方法,但隐式参数还是
Phone
类本身,和实例没有关系
public static void main(String[] args) {
Phone phone = new Phone();
Phone phone2 = new Phone();
new Thread(()->{
phone.sendStaticEmail();
}, "a").start();
new Thread(()->{
phone2.sendStaticSMS();
}, "b").start();
}
一个静态同步方法,一个普通同步方法,请问先打印邮件还是手机?短信
注意,普通方法里有暂停3秒的方法,所以输出是短信
实例锁和类锁是互不干扰的,这个在JVM中可以很好的解释,实例对象放在堆中,类信息放在方法区中
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.sendEmail();
}, "a").start();
new Thread(()->{
Phone.sendStaticSMS();
}, "b").start();
}
两个手机,一个静态同步方法,一个普通同步方法,请问先打印邮件还是手机?短信
不产生竞争条件
public static void main(String[] args) {
Phone phone = new Phone();
Phone phone2 = new Phone();
new Thread(()->{
phone.sendEmail();
}, "a").start();
new Thread(()->{
phone2.sendStaticSMS();
}, "b").start();
}
总结
- 作用域实例方法,当前实例加锁,进入同步代码块前要获得当前实例的锁。
- 作用于代码块,对括号里配置的对象加锁。
- 作用于静态方法,当前类加锁,进去同步代码前要获得当前类对象的锁。
分析synchronized实现
- 文件反编译
javap -c ***.class
文件反编译,-c表示对代码进行反汇编 - 假如需要更多信息
javap -v ***.class
,-v即-verbose输出附加信息(包括行号、本地变量表、反汇编等详细信息)
代码:
public class LockSyncDemo {
Object object = new Object();
public void m1(){
synchronized (object){
System.out.println("-----hello synchronized ------");
}
}
public static void main(String[] args) {
}
}
反编译结果:javap -c target/classes/com/ljq/demo/JUC/LockSyncDemo.class
E:\code\study>javap -c target/classes/com/ljq/demo/JUC/LockSyncDemo.class
Compiled from "LockSyncDemo.java"
public class com.ljq.demo.JUC.LockSyncDemo {
java.lang.Object object;
public com.ljq.demo.JUC.LockSyncDemo();
Code:
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: aload_0
5: new #2 // class java/lang/Object
8: dup
9: invokespecial #1 // Method java/lang/Object."<init>":()V
12: putfield #3 // Field object:Ljava/lang/Object;
15: return
public void m1();
Code:
0: aload_0
1: getfield #3 // Field object:Ljava/lang/Object;
4: dup
5: astore_1
6: monitorenter
7: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream;
10: ldc #5 // String -----hello synchronized ------
12: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
15: aload_1
16: monitorexit
17: goto 25
20: astore_2
21: aload_1
22: monitorexit
23: aload_2
24: athrow
25: return
Exception table:
from to target type
7 17 20 any
20 23 20 any
public static void main(java.lang.String[]);
Code:
0: return
}
总结:synchronized
同步代码块,实现使用的是moniterenter
和moniterexit
指令(moniterexit
可能有两个)
详细参数:javap -v target/classes/com/ljq/demo/JUC/LockSyncDemo.class
E:\code\study>javap -v target/classes/com/ljq/demo/JUC/LockSyncDemo.class
Classfile /E:/code/study/target/classes/com/ljq/demo/JUC/LockSyncDemo.class
Last modified 2022-10-14; size 832 bytes
MD5 checksum d71a521eddfb8d5e16b8d20a17417bfb
Compiled from "LockSyncDemo.java"
public class com.ljq.demo.JUC.LockSyncDemo
minor version: 0
major version: 52
flags: ACC_PUBLIC, ACC_SUPER
Constant pool:
#1 = Methodref #2.#28 // java/lang/Object."<init>":()V
#2 = Class #29 // java/lang/Object
#3 = Fieldref #7.#30 // com/ljq/demo/JUC/LockSyncDemo.object:Ljava/lang/Object;
#4 = Fieldref #31.#32 // java/lang/System.out:Ljava/io/PrintStream;
#5 = String #33 // -----hello synchronized ------
#6 = Methodref #34.#35 // java/io/PrintStream.println:(Ljava/lang/String;)V
#7 = Class #36 // com/ljq/demo/JUC/LockSyncDemo
#8 = Utf8 object
#9 = Utf8 Ljava/lang/Object;
#10 = Utf8 <init>
#11 = Utf8 ()V
#12 = Utf8 Code
#13 = Utf8 LineNumberTable
#14 = Utf8 LocalVariableTable
#15 = Utf8 this
#16 = Utf8 Lcom/ljq/demo/JUC/LockSyncDemo;
#17 = Utf8 m1
#18 = Utf8 StackMapTable
#19 = Class #36 // com/ljq/demo/JUC/LockSyncDemo
#20 = Class #29 // java/lang/Object
#21 = Class #37 // java/lang/Throwable
#22 = Utf8 main
#23 = Utf8 ([Ljava/lang/String;)V
#24 = Utf8 args
#25 = Utf8 [Ljava/lang/String;
#26 = Utf8 SourceFile
#27 = Utf8 LockSyncDemo.java
#28 = NameAndType #10:#11 // "<init>":()V
#29 = Utf8 java/lang/Object
#30 = NameAndType #8:#9 // object:Ljava/lang/Object;
#31 = Class #38 // java/lang/System
#32 = NameAndType #39:#40 // out:Ljava/io/PrintStream;
#33 = Utf8 -----hello synchronized ------
#34 = Class #41 // java/io/PrintStream
#35 = NameAndType #42:#43 // println:(Ljava/lang/String;)V
#36 = Utf8 com/ljq/demo/JUC/LockSyncDemo
#37 = Utf8 java/lang/Throwable
#38 = Utf8 java/lang/System
#39 = Utf8 out
#40 = Utf8 Ljava/io/PrintStream;
#41 = Utf8 java/io/PrintStream
#42 = Utf8 println
#43 = Utf8 (Ljava/lang/String;)V
{
java.lang.Object object;
descriptor: Ljava/lang/Object;
flags:
public com.ljq.demo.JUC.LockSyncDemo();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=3, locals=1, args_size=1
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: aload_0
5: new #2 // class java/lang/Object
8: dup
9: invokespecial #1 // Method java/lang/Object."<init>":()V
12: putfield #3 // Field object:Ljava/lang/Object;
15: return
LineNumberTable:
line 9: 0
line 10: 4
LocalVariableTable:
Start Length Slot Name Signature
0 16 0 this Lcom/ljq/demo/JUC/LockSyncDemo;
public void m1();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=2, locals=3, args_size=1
0: aload_0
1: getfield #3 // Field object:Ljava/lang/Object;
4: dup
5: astore_1
6: monitorenter
7: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream;
10: ldc #5 // String -----hello synchronized ------
12: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
15: aload_1
16: monitorexit
17: goto 25
20: astore_2
21: aload_1
22: monitorexit
23: aload_2
24: athrow
25: return
Exception table:
from to target type
7 17 20 any
20 23 20 any
LineNumberTable:
line 13: 0
line 14: 7
line 15: 15
line 16: 25
LocalVariableTable:
Start Length Slot Name Signature
0 26 0 this Lcom/ljq/demo/JUC/LockSyncDemo;
StackMapTable: number_of_entries = 2
frame_type = 255 /* full_frame */
offset_delta = 20
locals = [ class com/ljq/demo/JUC/LockSyncDemo, class java/lang/Object ]
stack = [ class java/lang/Throwable ]
frame_type = 250 /* chop */
offset_delta = 4
public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=0, locals=1, args_size=1
0: return
LineNumberTable:
line 20: 0
LocalVariableTable:
Start Length Slot Name Signature
0 1 0 args [Ljava/lang/String;
}
SourceFile: "LockSyncDemo.java"
总结:调用指令将会检查方法的访问标志是否被设置。如果设置了,执行线程会将先持有monitore
然后再执行方法,最后在方法完成(无论是正常完成还是非正常完成)时释放monitor
管程概念
管程:Monitor
(监视器),也就是我们平时说的锁。监视器锁
信号量及其操作原语“封装”在一个对象内部)管程实现了在一个时间点,最多只有一个线程在执行管程的某个子程序。 管程提供了一种机制,管程可以看做一个软件模块,它是将共享的变量和对于这些共享变量的操作封装起来,形成一个具有一定接口的功能模块,进程可以调用管程来实现进程级别的并发控制。
执行线程就要求先成功持有管程,然后才能执行方法,最后当方法完成(无论是正常完成还是非正常完成)时释放管理。在方法执行期间,执行线程持有了管程,其他任何线程都无法再获取到同一个管程。
为什么任何一个对象都可以成为一个锁?
- Java
Object
类是所有类的父类,也就是说 Java 的所有类都继承了Object
,子类可以使用 Object 的所有方法。 ObjectMonitor.java
→ObjectMonitor.cpp
→objectMonitor.hpp
ObjectMonitor() {
_header = NULL;
_count = 0; //用来记录该线程获取锁的次数
_waiters = 0,
_recursions = 0;//锁的重入次数
_object = NULL;
_owner = NULL; //------最重要的----指向持有ObjectMonitor对象的线程,记录哪个线程持有了我
_WaitSet = NULL; //存放处于wait状态的线程队列
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ;
FreeNext = NULL ;
_EntryList = NULL ;//存放处于等待锁block状态的线程队列
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
_previous_owner_tid = 0;
}
公平锁和非公平锁
抢票案例
class Ticket {
private int number = 30;
// 非公平锁
ReentrantLock lock = new ReentrantLock();
//ReentrantLock lock = new ReentrantLock(true);
public void sale() {
lock.lock();
try {
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "卖出第:\t" + (number--) + "\t 还剩下:" + number);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class SaleTicketDemo {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(() -> {
for (int i = 0; i < 35; i++) ticket.sale();
}, "a").start();
new Thread(() -> {
for (int i = 0; i < 35; i++) ticket.sale();
}, "b").start();
new Thread(() -> {
for (int i = 0; i < 35; i++) ticket.sale();
}, "c").start();
}
}
非公平锁
- 默认是非公平锁
- 非公平锁可以插队,买卖票不均匀。
- 是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁,在高并发环境下,有可能造成优先级翻转或饥饿的状态(某个线程一直得不到锁)
公平锁
ReentrantLock lock = new ReentrantLock(true);
- 买卖票一开始
a
占优,后面a b c a b c a b c
均匀分布 - 是指多个线程按照申请锁的顺序来获取锁,这里类似排队买票,先来的人先买后来的人在队尾排着,这是公平的。
为什么会有公平锁/非公平锁的设计?为什么默认是非公平?
- 恢复挂起的线程到真正锁的获取还是有时间差的,从开发人员来看这个时间微乎其微,但是从
CPU
的角度来看,这个时间差存在的还是很明显的。所以非公平锁能更充分的利用CPU
的时间片,尽量减少CPU
空闲状态时间。 - 使用多线程很重要的考量点是线程切换的开销,当采用非公平锁时,当1个线程请求锁获取同步状态,然后释放同步状态,因为不需要考虑是否还有前驱节点,所以刚释放锁的线程在此刻再次获取同步状态的概率就变得非常大,所以就减少了线程的开销。
什么时候用公平?什么时候用非公平?
如果为了更高的吞吐量,很显然非公平锁是比较合适的,因为节省很多线程切换时间,吞吐量自然就上去了;否则那就用公平锁,大家公平使用。
可重入锁(递归锁)
是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。
如果是1个有 synchronized
修饰的递归调用方法,程序第2次进入被自己阻塞了岂不是天大的笑话,出现了作茧自缚。
所以Java
中ReentrantLock
和synchronized
都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。
可重入锁详细解释
- 可:可以
- 重:再次
- 入:进入
- 锁:同步锁
- 进入什么:进入同步域(即同步代码块/方法或显示锁锁定的代码)
- 一句话:一个线程中的多个流程可以获取同一把锁,持有这把锁可以再次进入。自己可以获取自己的内部锁。
隐式锁Synchronized
是java
中的关键字,默认是可重入锁,即隐式锁
同步块中的写法
public class Demo01 {
public static void main(String[] args)
{
final Object objectLockA = new Object();
new Thread(() -> {
synchronized (objectLockA)
{
System.out.println("-----外层调用-----");
synchronized (objectLockA)
{
System.out.println("-----中层调用-----");
synchronized (objectLockA)
{
System.out.println("-----内层调用-----");
}
}
}
},"a").start();
}
}
同步方法中的写法
public class ReEntryLockDemo
{
public synchronized void m1()
{
//指的是可重复可递归调用的锁,在外层使用之后,在内层仍然可以使用,并且不发生死锁,这样的锁就叫做可重入锁
System.out.println(Thread.currentThread().getName()+"\t"+"-----come in m1");
m2();
System.out.println(Thread.currentThread().getName()+"\t-----end m1");
}
public synchronized void m2()
{
System.out.println("-----m2");
m3();
}
public synchronized void m3()
{
System.out.println("-----m3");
}
public static void main(String[] args)
{
ReEntryLockDemo reEntryLockDemo = new ReEntryLockDemo();
reEntryLockDemo.m1();
}
}
synchronized
的重入实现机理
ObjectMonitor() {
_header = NULL;
_count = 0; // 用来记录该线程获取锁的次数
_waiters = 0,
_recursions = 0;// 锁的重入次数
_object = NULL;
_owner = NULL; // ------最重要的----指向持有ObjectMonitor对象的线程,记录哪个线程持有了我
_WaitSet = NULL; //存放处于wait状态的线程队列
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ;
FreeNext = NULL ;
_EntryList = NULL ;// 存放处于等待锁block状态的线程队列
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
_previous_owner_tid = 0;
}
-
ObjectMoitor.hpp
底层:每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。_count _owner
-
首次加锁:当执行
monitorenter
时,如果目标锁对象的计数器为零,那么说明它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加1。 -
重入:在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么
Java
虚拟机可以将其计数器加1,否则需要等待,直至持有线程释放该锁。 -
释放锁:当执行
monitorexit
时,Java
虚拟机则需将锁对象的计数器减1。计数器为零代表锁已被释放。
显式锁Lock
:也有ReentrantLock
这样的可重入锁
public class ReEntryLockDemo {
static Lock lock = new ReentrantLock();
public static void main(String[] args) {
{
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t----come in 外层调用");
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t------come in 内层调用");
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
}, "t1").start();
}
}
}
假如lock
unlock
不成对,单线程情况下问题不大,但多线程下会出问题
public class ReEntryLockDemo {
static Lock lock = new ReentrantLock();
public static void main(String[] args) {
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t----come in 外层调用");
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t------come in 内层调用");
} finally {
//lock.unlock();
}
} finally {
lock.unlock();//-------------------------不成对|多线程情况
}
}, "t1").start();
new Thread(() -> {
lock.lock();
try
{
System.out.println("t2 ----外层调用lock");
}finally {
lock.unlock();
}
},"t2").start();
}
}
死锁及排查
死锁
- 是指两个或两个以上的线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干涉那它们都将无法推进下去,如果系统资源充足,进程的资源请求都能够得到满足,死锁出现的可能性就很低,否则就会因争夺有限的资源而陷入死锁。
- a跟b两个资源互相请求对方的资源
死锁产生的原因
- 系统资源不足
- 进程运行推进的顺序不合适
- 资源分配不当
死锁的代码
public class DeadLockDemo {
public static void main(String[] args) {
Object object1 = new Object();
Object object2 = new Object();
new Thread(()->{
synchronized (object1){
System.out.println(Thread.currentThread().getName()+"\t 持有a锁,想获得b锁");
try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}
synchronized (object2){
System.out.println(Thread.currentThread().getName()+"\t 成功获得b锁");
}
}
},"A").start();
new Thread(()->{
synchronized (object2){
System.out.println(Thread.currentThread().getName()+"\t 持有b锁,想获得a锁");
synchronized (object1){
System.out.println(Thread.currentThread().getName()+"\t 成功获得a锁");
}
}
},"B").start();
}
}
如何排查死锁
第一种方式:命令行的方法,在终端中查看
jps -l
查看当前进程运行状况jstack 进程编号
查看该进程信息
E:\code\study>jps -l
24784 sun.tools.jps.Jps
24880
19528 org.jetbrains.jps.cmdline.Launcher
13692 com.ljq.demo.JUC.DeadLockDemo
E:\code\study>jstack 13692
。。。。东西太多了,省略
Found 1 deadlock.
第二种方式:java自带的图形化界面
win
+ r
输入jconsole
,打开图形化工具,打开线程
,点击 检测死锁
。
总结
指针指向monitor
对象(也称为管程或监视器锁)的起始地址。每个对象都存在着一个monitor
与之关联,当一个monitor
被某个线程持有后,它便处于锁定状态。在Java虚拟机(HotSpot
)中,monitor
是由ObjectMonitor
实现的,其主要数据结构如下(位于HotSpot
虚拟机源码ObjectMonitor.hpp
,C++
实现的)
四、线程中断和LockSupport
线程中断的三个方法:
修饰符 | 方法名 | 描述 |
---|---|---|
void | interrupt() | 中断此线程 |
static boolean | interrupted() | 测试当前线程是否已被中断 |
boolean | isInterrupted() | 测试此线程是否已经被中断 |
什么是中断机制?
一个线程不应该由其他线程强制中断或停止,这会造成意想不到的后果,而是应该由自己进行自行停止。所以在JDK
中:Thread.stop
, Thread.suspend
, Thread.resume
都已经被废弃了。
在java
中,没有办法立即停止一条线程,然而停止线程是很重要的方法,如取消一个消耗资源非常大的方法。因此java
提供了一种用于停止小城的协商机制——中断
中断只是一种协作协商机制,java
没有给中断增加任何语法,中断的过程完全需要程序员自己实现。
中断方法说明:
方法签名 | 说明 |
---|---|
public void interrupt() | 实例方法,实例方法interrupt() 仅仅是设置线程的中断状态为true ,发起一个协商而不会立刻停止线程 |
public static boolean interrupted() | 静态方法,Thread.interrupted(); 判断线程是否被中断,并清除当前中断状态这个方法做了两件事:1)返回当前线程的中断状态。2)将当前线程的中断状态设为false(这个方法有点不好理解,因为连续调用两次的结果可能不一样。) |
public boolean isInterrupted() | 实例方法,判断当前线程是否被中断(通过检查中断标志位) |
如何停止中断运行中的线程?
使用volatile
关键字,以下是volatile
关键字的作用
- 保证变量的内存可见性
- 禁止指令重排序
private static volatile boolean isStop = false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
while (true) {
if (isStop) {
System.out.println(Thread.currentThread().getName() + "\t isStop被修改为true,程序终止 ----");
break;
}
System.out.println(Thread.currentThread().getName() + "\t 正在运行-------");
}
}, "t1");
t1.start();
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
// 线程中断
isStop = true;
}, "t2").start();
}
通过AtomicBoolean
(原子布尔型)
private static AtomicBoolean atomicBoolean = new AtomicBoolean(false);
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
while (true) {
if (atomicBoolean.get()) {
System.out.println(Thread.currentThread().getName() + "\t atomicBoolean被修改为true,程序终止 ----");
break;
}
System.out.println(Thread.currentThread().getName() + "\t 正在运行-------");
}
}, "t1");
t1.start();
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
// 线程中断
atomicBoolean.set(true);
}, "t2").start();
}
通过Thread
类自带的中断api
方法实现,也就是上面介绍的interrupt
方法
Thread t1 = new Thread(() -> {
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println(Thread.currentThread().getName() + "\t interrupt 标志被修改为true,程序终止 ----");
break;
}
System.out.println(Thread.currentThread().getName() + "\t 正在运行-------");
}
}, "t1");
t1.start();
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
t1.interrupt();
}, "t2").start();
中断方法的源码分析
interrupt
方法:底层调用了native
方法interrupt0
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
isInterrupted
方法,底层调用了isInterrupted
方法,native
的isInterrupted
方法参数含义为是否重置中断标志,并重新设置为false
。
public boolean isInterrupted() {
return isInterrupted(false);
}
private native boolean isInterrupted(boolean ClearInterrupted);
使用中断方法的说明
当对一个线程,调用 interrupt()
时:
-
如果线程处于正常活动状态,那么会将该线程的中断标志设置为
true
,仅此而已。被设置中断标志的线程将继续正常运行,不受影响。所以,interrupt()
并不能真正的中断线程,需要被调用的线程自己进行配合才行。 -
如果线程处于被阻塞状态(例如处于
sleep
,wait
,join
等状态),在别的线程中调用当前线程对象的interrupt
方法,那么线程将立即退出被阻塞状态(中断状态将被清除),并抛出一个InterruptedException
异常。 -
中断不活动的线程不会产生任何影响
当前线程的中断标识为true
,是不是线程就立刻停止?
否,仅仅是设置了一个中断状态。
Thread t1 = new Thread(() -> {
for (int i = 0; i < 300; i++) {
System.out.println("---------" + i);
}
System.out.println("我是线程内部方法,侦测到线程中断状态为 " + Thread.currentThread().isInterrupted());
}, "t1");
t1.start();
System.out.println("第一次 t1的中断状态是 ->" + t1.isInterrupted());
t1.interrupt();
try {
TimeUnit.MILLISECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第二次 调用了interrupt方法 t1的中断状态是 ->" + t1.isInterrupted());
try {
TimeUnit.SECONDS.sleep(3L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第三次 睡了3秒之后 t1的中断状态是 ->" + t1.isInterrupted());
输出:
第一次 t1的中断状态是 ->false
---------0
// 省略
---------197
第二次 调用了interrupt方法 t1的中断状态是 ->true
---------198
// 省略
---------299
我是线程内部方法,侦测到线程中断状态为 true
第三次 睡了3秒之后 t1的中断状态是 ->false
结论:
可以看到,在中断状态变化之后,程序依然执行,直到线程的方法体结束。
案例
请看下面中断失败的案例:
Thread t1 = new Thread(() -> {
int i = 0;
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("我是线程内部方法,侦测到线程中断状态为 " + Thread.currentThread().isInterrupted());
break;
}
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我还在运行呢 -> " + i++);
}
}, "t1");
t1.start();
try {
TimeUnit.MILLISECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
// 创建线程
t1.interrupt();
},"t2").start();
结论:在线程抛出异常的时候,比如上文说到的sleep
, wait
, join
等状态,在别的线程中调用t1
的interrupt
方法,那么线程会立即退出被阻塞状态(中断状态将被清除),并抛出一个InterruptedException
异常。
如何解决?
在
catch
语句中重新调用interrupt
方法
Thread t1 = new Thread(() -> {
int i = 0;
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("我是线程内部方法,侦测到线程中断状态为 " + Thread.currentThread().isInterrupted());
break;
}
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 抛出异常后,重新设置线程的中断状态
e.printStackTrace();
}
System.out.println("我还在运行呢 -> " + i++);
}
}, "t1");
t1.start();
关于Thread.interrupted()
静态方法,Thread.interrupted()
判断线程是否被中断,并清除当前中断状态这个方法做了两件事:
-
返回当前线程的中断状态
-
将当前线程的中断状态设为false(这个方法有点不好理解,因为连续调用两次的结果可能不一样。)
简而言之:就是先返回当前中断状态,再重置中断状态,重新设置为false;
System.out.println(Thread.currentThread().getName() + "\t" + Thread.interrupted());
System.out.println(Thread.currentThread().getName() + "\t" + Thread.interrupted());
System.out.println("-----1-----");
Thread.currentThread().interrupt();// 中断标志位设置为true
System.out.println("-----2-----");
System.out.println(Thread.currentThread().getName() + "\t" + Thread.interrupted());
System.out.println(Thread.currentThread().getName() + "\t" + Thread.interrupted());
输出结果
main false // 当前状态为false
main false // 当前状态为false
-----1-----
-----2-----
main true // 当前状态为true,重置中断状态为false
main false // 当前状态为false
源码分析
可以看到
interrupted
和isInterrupted
底层都是通过native
的isInterrupted
方法来控制的,只不过参数不同。
isInterrupted(true)
:表示清空当前的中断状态
isInterrupted(false)
:表示不清空当前的中断状态这也就介绍了关于
interrupted
和isInterrupted
的不同地方
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
// ClearInterrupted是否清除中断状态,并设置为false
private native boolean isInterrupted(boolean ClearInterrupted);
public boolean isInterrupted() {
return isInterrupted(false);
}
线程通信(补充)
两个线程交替工作
synchronized
实现
class DataObj {
private int cnt = 0;
public synchronized void increment() throws InterruptedException {
// 判断
if (cnt != 0) {
this.wait();
}
// 干活
++cnt;
System.out.println(Thread.currentThread().getName() + "\t" + cnt);
// 通知
this.notify();
}
public synchronized void decrement() throws InterruptedException {
// 判断
if (cnt == 0) {
this.wait();
}
// 干活
--cnt;
System.out.println(Thread.currentThread().getName() +"\t" + cnt);
// 通知
this.notify();
}
}
public class Demo02 {
public static void main(String[] args) {
DataObj obj = new DataObj();
new Thread(() -> {
for (int i = 1; i < 10; i++) {
try {
obj.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 1; i < 10; i++) {
try {
obj.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
}
}
缺点:上述代码中有一个缺点,那就是if
判断,如果在多个线程的环境下(如4个线程),那么会出现一个问题,虚假唤醒
原因:在判断过程中,突然有一添加的线程进到if
了,突然中断了交出控制权,没有进行验证,而是直接走下去了,加了两次,甚至多次
解决方式:中断和虚假唤醒是可能产生的,所以要用loop
循环,if
只判断一次,while
是只要唤醒就要拉回来再判断一次。if
换成while
将上面的代码使用Lock
方式实现,并改变判断方式
class DataObj03 {
private int cnt = 0;
private Lock lock = new ReentrantLock();
// 用来实现 类似wait 和 notify的方法
private Condition condition = lock.newCondition();
public void increment() throws InterruptedException {
// 判断
lock.lock();
try {
while (cnt != 0) {
condition.await();
}
// 干活
++cnt;
System.out.println(Thread.currentThread().getName() + "\t" + cnt);
// 通知
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() throws InterruptedException {
// 判断
lock.lock();
try {
while (cnt != 1) {
condition.await();
}
// 干活
--cnt;
System.out.println(Thread.currentThread().getName() + "\t" + cnt);
// 通知
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class Demo03 {
public static void main(String[] args) {
DataObj03 dataObj03 = new DataObj03();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
dataObj03.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
dataObj03.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
dataObj03.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
dataObj03.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
LockSupport
是什么?官方解释:用于创建锁和其他同步类的基本线程阻塞原语。
核心就是
park()
和unpark()
方法
park()
方法是阻塞线程unpark()
方法是解除阻塞线程
线程等待和唤醒的方法
- 使用
Object
中的wait()
方法让线程等待,使用Object中的notify()
方法唤醒线程 - 使用
JUC
包中Condition
的await()
方法让线程等待,使用signal()
方法唤醒线程 LockSupport
类可以阻塞当前线程以及唤醒指定被阻塞的线程
Object
类中的wait
和notify
方法实现线程等待和唤醒
Object objLock = new Object();
new Thread(() -> {
synchronized (objLock) {
System.out.println(Thread.currentThread().getName() + "\t -------线程执行了-------");
try {
objLock.wait(); // 等待并释放锁资源
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "\t" + "---被唤醒了---");
}, "t1").start();
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
synchronized (objLock) {
objLock.notify();
System.out.println(Thread.currentThread().getName() +" => 发送唤醒通知");
}
},"t2").start();
总结:
wait
和notify
方法是必须结合synchronized
进行使用的,如果外层的同步代码块被去掉了,那么将会抛出IllegalMonitorStateException
异常- 如果将
notify
方法和wait
方法的顺序进行调换,则会导致程序一直在等待唤醒,并一直处于循环中(先唤醒在等待)
Condition
接口中的await
后signal
方法实现线程的等待和唤醒
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 正在执行");
condition.await();
System.out.println(Thread.currentThread().getName() + " 我被唤醒了");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t1").start();
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
lock.lock();
try {
condition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
System.out.println(Thread.currentThread().getName()+" -> 我要唤醒 t1");
}, "t2").start();
总结:
- 同理:
condition
中await
和signal
方法需要配置lock
和unlock
方法进行使用,否则会抛出IllegalMonitorStateException
异常 - 如果将
await
方法和signal
方法的顺序进行调换,则会导致程序一直在等待唤醒,并一直处于循环中(先唤醒在等待)
LockSupport
类中的park
等待和unpark
唤醒
将会解决 唤醒和等待 操作顺序不当造成的问题
LockSupport
是用来创建锁和其他同步类的基本线程阻塞原语。LockSupport
类使用了一种名为Permit
(许可) 的概念来做到阻塞和唤醒线程的功能, 每个线程都有一个许可(permit
),permit
(许可)只有两个值1和0,默认是0。0 是阻塞,1是唤醒- 可以把许可看成是一种(0,1)信号量(
Semaphore
),但与Semaphore
不同的是,许可的累加上限是1。
源码:调用LockSupport.park()
时,发现它调用了unsafe类
,并且默认传了一个0
permit
默认是零,所以一开始调用park()
方法,当前线程就会阻塞,直到别的线程将当前线程的permit
设置为1时,park
方法会被唤醒,然后会将permit
再次设置为零并返回。
public static void park() {
UNSAFE.park(false, 0L);
}
调用LockSupport.unpark();
时,也调用了unsafe类
调用
unpark(thread)
方法后,就会将thread
线程的许可permit
设置成1
(注意多次调用unpark
方法,不会累加,permit
值还是1)会自动唤醒thread
线程,即之前阻塞中的LockSupport.park()
方法会立即返回。
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
案例代码:
常规使用方法
Thread t1 = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 线程正在执行");
LockSupport.park(); // 阻塞线程
System.out.println(Thread.currentThread().getName() + " 线程被唤醒了");
}, "t1");
t1.start();
new Thread(() -> {
LockSupport.unpark(t1); // 发送凭证的方法
System.out.println(Thread.currentThread().getName() + " 我去唤醒t1");
}, "t2").start();
先唤醒后等待
运行下面的代码,我们可以发现,
t2
先去唤醒了t1
,此时t1
正在睡眠中,醒来之后,继续执行了下去先执行了
unpark(t1)
导致上面的park
方法形同虚设无效,时间是一样的。
Thread t1 = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 线程正在执行" + );
LockSupport.park(); // 阻塞线程
System.out.println(Thread.currentThread().getName() + " 线程被唤醒了");
}, "t1");
t1.start();
new Thread(() -> {
LockSupport.unpark(t1); // 发送凭证的方法
System.out.println(Thread.currentThread().getName() + " 我去唤醒t1");
}, "t2").start();
输出结果
t2 我去唤醒t1
t1 线程正在执行1665922371663
t1 线程被唤醒了1665922371663
验证多个许可证是否有效的问题
每个线程都有一个相关的
permit
,permit
最多只有一个, 重复调用unpark
也不会积累凭证。
Thread t1 = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 线程正在执行" + System.currentTimeMillis());
LockSupport.park(); // 阻塞线程
LockSupport.park(); // 阻塞线程
System.out.println(Thread.currentThread().getName() + " 线程被唤醒了" + System.currentTimeMillis());
}, "t1");
t1.start();
new Thread(() -> {
LockSupport.unpark(t1); // 发送凭证的方法
LockSupport.unpark(t1); // 发送凭证的方法
LockSupport.unpark(t1); // 发送凭证的方法
System.out.println(Thread.currentThread().getName() + " 我去唤醒t1");
}, "t2").start();
LockSupport
总结
LockSupport
是用来创建锁和其他同步类的基本线程阻塞原语。LockSupport
是一个线程阻塞工具类, 所有的方法都是静态方法, 可以让线程在任意位置阻塞, 阻塞之后也有对应的唤醒方法。归根结
底,LockSupport
调用的Unsafe
中的native
代码。LockSupport
提供park()
和unpark()
方法实现阻塞线程和解除线程阻塞的过程LockSupport
和每个使用它的线程都有一个许可(permit
) 关联。- 每个线程都有一个相关的
permit
,permit
最多只有一个, 重复调用unpark
也不会积累凭证。 - 线程阻塞需要消耗凭证(
permit
) , 这个凭证最多只有1个。
最后实验(自己的猜想)
凭证每次只会存在一个,消耗完之后才会产生第二个(虽然是第二个,但有且只有一个),只要在被消耗完之后,在生产一个,这样下一个
park
就可以使用了
Thread t1 = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 线程正在执行");
LockSupport.park(); // 阻塞线程
System.out.println(Thread.currentThread().getName() + "我被唤醒了第一次");
try {
TimeUnit.SECONDS.sleep(5L);
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.park(); // 阻塞线程
System.out.println(Thread.currentThread().getName() + " 线程被唤醒了 第二次");
}, "t1");
t1.start();
new Thread(() -> {
LockSupport.unpark(t1); // 发送凭证的方法
System.out.println(Thread.currentThread().getName() + " 第一次 我去唤醒t1");
try {
TimeUnit.SECONDS.sleep(2L);
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.unpark(t1); // 发送凭证的方法
System.out.println(Thread.currentThread().getName() + " 第二次 我去唤醒t1");
}, "t2").start();
五、内存模型JMM
JMM
(Java
内存模型Java Memory Model
,简称JMM
)本身是一种抽象的概念并不真实存在它仅仅描述的是一组约定或规范,通过这组规范定义了程序中(尤其是多线程)各个变量的读写访问方式并决定一个线程对共享变量的写入何时以及如何变成对另一个线程可见,关键技术点都是围绕多线程的原子性、可见性和有序性展开的。
原则: JMM
的关键技术点都是围绕多线程的原子性、可见性和有序性展开的
能干嘛?
- 通过
JMM
来实现线程和主内存之间的抽象关系。 - 屏蔽各个硬件平台和操作系统的内存访问差异以实现让
Java
程序在各种平台下都能达到一致的内存访问效果。
JMM
规范中三大特性
可见性:是指当一个线程修改了某一个共享变量的值,其他线程是否能够自己知道该变更,JMM
规定了所有的变量都存储在主内存中。
Java
中普通的共享变量不保证可见性,因为数据修改被写入内存的时机是不确定的,多线程并发情况下很可能出现脏读,所以每个线程都有自己的工作内存,线程自己的工作内存中保存了该线程使用到的变量的主内存副本拷贝。线程对变量的所有操作(读取、赋值等)都必须在线程自己的工作内存中进行,而不能够直接读写主内存中的变量。不同线程之间也无法直接访问对方工作内存中的变量,线程间变量值的传递均需要通过主内存来完成。
原子性:指一个操作是不可中断的,即多线程环境下,操作不能被其他线程干扰。
有序性:对一个线程的执行代码而言,java
程序员总是习惯性的任务是从上到下,按照我们编写的顺序执行。但为了提供性能,编译器和处理器通常会对指令序列进行重新排序。指令重排可以保证串行语义一致,但多线程情况下语义是没办法保证的,即可能产生脏读,用人话来讲两行以上不相干的代码在执行的时候有可能先执行的不是第一条,不见得是从上到下顺序执行,执行顺序会被优化。
public static void mySort() {
int x = 11; // 语句1
int y = 14; // 语句2
x += 5; // 语句3
y = x * x; // 语句4
}
上述代码块中,如果执行顺序是以下顺序,会有什么影响?
-
1234 可以
-
2134 可以
-
1324 可以
-
语句4可以重排后编程第一条嘛?不可以。
单线程环境里面确保程序最终执行结果和代码顺序执行的结果一致。 处理器在进行重排序时必须要考虑指令之间的数据依赖性 ,多线程环境中线程交替执行,由于编译器优化重排的存在,两个线程中使用的变量能否保证一致性是无法确定的,结果无法预测
JMM
规范中,多线程对变量的读写过程
读取过程
由于JVM
运行程序的实体是线程,而每个线程创建时JVM
都会为其创建一个工作内存(有些地方称为栈空间),工作内存是每个线程的私有数据区域,而Java内存模型中规定所有变量都存储在主内存,主内存是共享内存区域,所有线程都可以访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行,首先要将变量从主内存拷贝到的线程自己的工作内存空间,然后对变量进行操作,操作完成后再将变量写回主内存,不能直接操作主内存中的变量,各个线程中的工作内存中存储着主内存中的变量副本拷贝,因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成。
JMM
定义了线程和主内存之间的抽象关系
- 线程之间的共享变量存储在主内存中(从硬件角度来说就是内存条)
- 每个线程都有一个私有的本地工作内存,本地工作内存中存储了该线程用来读/写共享变量的副本(从硬件角度来说就是
CPU
的缓存,比如寄存器、L1
、L2
、L3
缓存等)
总结
- 我们定义的所有共享变量都储存在物理主内存中
- 每个线程都有自己独立的工作内存,里面保存该线程使用到的变量的副本(主内存中该变量的一份拷贝)
- 线程对共享变量所有的操作都必须先在线程自己的工作内存中进行后写回主内存,不能直接从主内存中读写(不能越级)
- 不同线程之间也无法直接访问其他线程的工作内存中的变量,线程间变量值的传递需要通过主内存来进行(同级不能相互访问)
JMM
规范下,多线程先行发生原则之happens-before
先行发生原则(happens-before
)被定义在了JMM
之中
如果Java
内存模型中所有的有序性都仅靠volatile
和synchronized
来完成,那么有很多操作都将会变得非常啰嗦,但是我们在编写Java
并发代码的时候并没有察觉到这一点。我们没有时时、处处、次次,添加volatile
和synchronized
来完成程序,这是因为Java
语言中JMM
原则下有一个**“先行发生”(Happens-Before
)的原则限制和规矩**
这个原则非常重要:它是判断数据是否存在竞争,线程是否安全的非常有用的手段。依赖这个原则,我们可以通过几条简单规则一揽子解决并发环境下两个操作之间是否可能存在冲突的所有问题,而不需要陷入Java
内存模型苦涩难懂的 底层编译原理之中。
总原则
-
如果一个操作
happens-before
另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。 -
两个操作之间存在
happens-before
关系,并不一定要按照happens-before
原则制定的顺序来执行。如果重排序之后的执行结果与按照happens-before
关系来执行的结果一致,那么这种重排序并不非法。
JMM
存在的天然存在的happens-before
关系
-
次序规则:一个线程内,按照代码顺序,写在前面的操作先行发生于写在后面的操作。
-
锁定规则:锁的获取的先后顺序
一个
unLock
操作先行发生于后面(这里的后面是指时间上的先后)对同一个锁的lock
操作(一个线程想要lock,肯定要等前面的锁unLock
释放这个资源)//对于同一把锁objectLock,threadA一定先unlock同一把锁后B才能获得该锁, A 先行发生于B synchronized (objectLock){ }
-
volatile变量规则:对一个
volatile
变量的写操作先行发生于后面对这个变量的读操作,前面的写对后面的读是可见的,这里的“后面”同样是指时间上的先后。 -
传递规则:如果操作
A
先行发生于操作B
,而操作B
又先行发生于操作C
,则可以得出操作A先行发生于操作C; -
线程启动规则(
Thread Start Rule
):Thread
对象的start()
方法先行发生于此线程的每一个动作 -
线程中断规则(
Thread Interruption Rule
):对线程interrupt()
方法的调用先行发生于被中断线程的代码检测到中断事件的发生;可以通过Thread.interrupted()
检测到是否发生中断 -
线程终止规则(
Thread Termination Rule
): 线程中的所有操作都先行发生于对此线程的终止检测,我们可以通过Thread::join()
方法是否结束、Thread::isAlive()
的返回值等手段检测线程是否已经终止执行。 -
对象终结规则(
Finalizer Rule
): 一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()
方法的开始,对象没有完成初始化之前,是不能调用finalized()方法的。
案例说明
假设存在线程A和B,线程A先(时间上的先后)调用了setValue(1)
,然后线程B调用了同一个对象的getValue()
,那么线程B收到的返回值是什么?
我们就这段简单的代码一次分析happens-before
的规则(规则5、6、7、8 可以忽略,因为他们和这段代码毫无关系)
- 由于两个方法是由不同的线程调用,不在同一个线程中,所以肯定不满足程序次序规则;
- 两个方法都没有使用锁,所以不满足锁定规则;
- 变量不是用
volatile
修饰的,所以volatile
变量规则不满足; - 传递规则肯定不满足;
所以我们无法通过happens-before
原则推导出线程A happens-before
线程B,虽然可以确认在时间上线程A优先于线程B指定,但就是无法确认线程B获得的结果是什么,所以这段代码不是线程安全的。那么怎么修复这段代码呢?
- 把
getter/setter
方法都定义为synchronized
方法 - 把
value
定义为volatile
变量,由于setter
方法对value
的修改不依赖value
的原值,满足volatile
关键字使用场景。
六、volatile
与Java
内存模型
被
volatile
修饰的变量有2个特点:可见性 和 有序性
volatile
的内存语义:
- 当写一个
volatile
变量时,JMM
会把该线程对应的本地内存中的共享变量值立即刷新到主内存中。 - 当读一个
volatile
变量时,JMM
会把该线程对应的本地内存设置为无效,直接从主内存中读取共享变量。 - 所以
volatile
的写内存语义是直接刷新到主内存中,读的内存语义是直接从主内存中读取。
内存屏障
是什么?
内存屏障(也称内存栅栏,屏障指令等,是一类同步屏障指令,是CPU或编译器在对内存随机访问的操作中的一个同步点,使得此点之前的所有读写操作都执行后才可以执行此点之后的操作),避免代码重排序。内存屏障其实就是一种JVM
指令,Java
内存模型的重排规则会要求java
编译器在生成JVM
指令时插入特定的内存屏障指令,通过这些内存屏障指令,volatile
实现了java
内存模型中的可见性和有序性,但无法保证原子性。
内存屏障之前的所有写操作都要回写到主内存,内存屏障之后的所有读操作都能获得内存屏障之前的所有写操作的最新结果(实现了可见性)。
因此重排序时,不允许把内存屏障之后的指令重排序到内存屏障之前。 一句话:对一个 volatile
域的写, happens-before
于任意后续对这个 volatile
域的读,也叫写后读。
**volatile
凭什么可以保证可见性和有序性?**内存屏障 (Memory Barriers / Fences
)
JVM
中提供了四类内存屏障指令
屏障类型 | 指令示例 | 说明 |
---|---|---|
loadload() | load1;loadload;load2 | 保证load1 的读取操作在load2 及后续读取操作之前执行 |
storestore() | store1;storestore;store2 | 在store2 及其后的写操作执行前,保证store1 的写操作已经刷新到主内存中 |
loadstore() | load1;loadstore;store2 | 在store2 及其后的写操作执行前,保证load1 的读操作已经读取结束 |
storeload() | store1;storeload;load2 | 保证store1 的写操作已刷新到主内存之后,load2 及其后的读操作才能执行 |
volatile
的底层实现是通过内存屏障
happens-before
之 volatile
变量规则
第一个操作 | 第二个操作:普通读写 | 第二个操作:volatile 读 | 第二个操作:volatile 写 |
---|---|---|---|
普通读写 | 可以重排 | 可以重排 | 不可以重排 |
volatile 读 | 不可以重排 | 不可以重排 | 不可以重排 |
volatile 写 | 可以重排 | 不可以重排 | 不可以重排 |
- 当第一个操作为
volatile
读时,不论第二个操作是什么,都不能重排序。这个操作保证了volatile
读之后的操作不会被重排到volatile
读之前。 - 当第二个操作为
volatile
写时,不论第一个操作是什么,都不能重排序。这个操作保证了volatile
写之前的操作不会被重排到volatile
写之后。 - 当第一个操作为
volatile
写时,第二个操作为volatile
读时,不能重排。
-
写
- 在每个
volatile
写操作的前⾯插⼊⼀个StoreStore
屏障 - 在每个
volatile
写操作的后⾯插⼊⼀个StoreLoad
屏障
- 在每个
-
读
- 在每个
volatile
读操作的后⾯插⼊⼀个LoadLoad
屏障 - 在每个
volatile
读操作的后⾯插⼊⼀个LoadStore
屏障
- 在每个
四大屏障的插入情况
- 在每一个
volatile
写操作前面插入一个StoreStore
屏障StoreStore
屏障可以保证在volatile
写之前,其前面的所有普通写操作都已经刷新到主内存中。
- 在每一个
volatile
写操作后面插入一个StoreLoad
屏障StoreLoad
屏障的作用是避免volatile写与后面可能有的volatile
读/写操作重排序
- 在每一个
volatile
读操作后面插入一个LoadLoad
屏障LoadLoad
屏障用来禁止处理器把上面的volatile
读与下面的普通读重排序。
- 在每一个
volatile
读操作后面插入一个LoadStore
屏障LoadStore
屏障用来禁止处理器把上面的volatile
读与下面的普通写重排序。
volatile
特性
保证不同线程对这个变量进行操作时的可见性,即变量一旦改变所有线程立即可见
不加volatile,没有可见性,程序无法停止
加了volatile,保证可见性,程序可以停止
public class Demo05 {
//private static boolean flag = true;
private volatile static boolean flag = true; // 保证其他线程对这个变量的可见性
public static void main(String[] args) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() +" \t 线程进入方法体");
while (flag) {
}
System.out.println(Thread.currentThread().getName() +" \t 线程方法体执行完毕");
},"t1").start();
try {
TimeUnit.SECONDS.sleep(2L);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = false;
System.out.println("主线程执行完毕");
}
}
线程t1
中为何看不到被主线程main
修改为false
的flag
的值?
- 主线程修改了
flag
之后没有将其刷新到主内存,所以t1
线程看不到。 - 主线程将
flag
刷新到了主内存,但是t1
一直读取的是自己工作内存中flag
的值,没有去主内存中更新获取flag
最新的值。
解决方法:使用volatile
修饰共享变量,就可以达到上面的效果,被volatile
修改的变量有以下特点:
- 线程中读取的时候,每次读取都会去主内存中读取共享变量最新的值,然后将其复制到工作内存
- 线程中修改了工作内存中变量的副本,修改之后会立即刷新到主内存
volatile
变量的读写过程
Java内存模型中定义的8种工作内存与主内存之间的原子操作 read
(读取)→load
(加载)→use
(使用)→assign
(赋值)→store
(存储)→write
(写入)→lock
(锁定)→unlock
(解锁)
read
: 作用于主内存,将变量的值从主内存传输到工作内存,主内存到工作内存load
: 作用于工作内存,将read
从主内存传输的变量值放入工作内存变量副本中,即数据加载use
: 作用于工作内存,将工作内存变量副本的值传递给执行引擎,每当JVM
遇到需要该变量的字节码指令时会执行该操作assign
: 作用于工作内存,将从执行引擎接收到的值赋值给工作内存变量,每当JVM
遇到一个给变量赋值字节码指令时会执行该操作store
: 作用于工作内存,将赋值完毕的工作变量的值写回给主内存write
: 作用于主内存,将store
传输过来的变量值赋值给主内存中的变量
由于上述只能保证单条指令的原子性,针对多条指令的组合性原子保证,没有大面积加锁,所以,JVM
提供了另外两个原子指令
lock
: 作用于主内存,将一个变量标记为一个线程独占的状态,只是写时候加锁,就只是锁了写变量的过程。unlock
: 作用于主内存,把一个处于锁定状态的变量释放,然后才能被其他线程占用。
volatile变量的复合操作不具有原子性
如i++
;
class MyNumber {
volatile int number = 0;
public void addPlusPlus() {
number++;
}
}
public class VolatileDemo {
public static void main(String[] args) throws InterruptedException {
MyNumber myNumber = new MyNumber();
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
for (int j = 1; j <= 1000; j++) {
myNumber.addPlusPlus();
}
}, String.valueOf(i)).start();
}
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t" + myNumber.number);
}
}
从字节码的角度来说
原子性指的是一个操作是不可中断的,即使是在多线程环境下,一个操作一旦开始就不会被其他线程影响。
public void add(){ i++; //不具备原子性,该操作是先读取值,然后写回一个新值,相当于原来的值加上1,分3步完成 }
如果第二个线程在第一个线程读取旧值和写回新值期间读取i的域值,那么第二个线程就会与第一个线程一起看到同一个值,
并执行相同值的加1操作,这也就造成了线程安全失败,因此对于add
方法必须使用synchronized
修饰,以便保证线程安全.
多线程环境下,"数据计算"和"数据赋值"操作可能多次出现,即操作非原子。若数据在加载之后,若主内存count
变量发生修改之后,由于线程工作内存中的值在此前已经加载,从而不会对变更操作做出相应变化,即私有内存和公共内存中变量不同步,进而导致数据不一致
对于volatile
变量,JVM
只是保证从主内存加载到线程工作内存的值是最新的,也就是数据加载时是最新的。
由此可见volatile
解决的是变量读时的可见性问题,但无法保证原子性,对于多线程修改共享变量的场景必须使用加锁同步
为什么一修改就是可见,却不能保证原子性?
read-load-use
和 assign-store-write
成为了两个不可分割的原子操作,但是在use
和assign
之间依然有极小的一段真空期,有可能变量会被其他线程读取,导致写丢失一次
但是无论在哪一个时间点主内存的变量和任一工作内存的变量的值都是相等的。这个特性就导致了volatile
变量不适合参与到依赖当前值的运算,如i = i + 1; i++;
之类的那么依靠可见性的特点volatile
可以用在哪些地方呢? 通常volatile
用做保存某个状态的boolean
值或 int
值。
《深入理解Java虚拟机》提到:
由于volatile
变量只能保证可见性,在不符合以下两条规则的运算场景中,我们仍然要通过加锁来保证原子性:
- 运算结果并不依赖变量的当前值,或者能够确保只有单一的线程修改变量的值。
- 变量不需要与其他的状态变量共同参与不变约束
指令禁重排
重排序是指编译器和处理器为了优化程序性能而对指令序列进行重新排序的一种手段,有时候会改变程序语句的先后顺序
- 不存在数据依赖关系,可以重排序;
- 存在数据依赖关系,禁止重排序
但重排后的指令绝对不能改变原有的串行语义!这点在并发设计中必须要重点考虑!
重排序的分类和执行流程
编译器和处理器在重排序时,会遵守数据依赖性,不会改变存在依赖关系的两个操作的执行,但不同处理器和不同线程之间的数据性不会被编译器和处理器考虑,其只会作用于单处理器和单线程环境,下面三种情况,只要重排序两个操作的执行顺序,程序的执行结果就会被改变。
重排前 | 重排后 |
---|---|
int a = 1; //1 int b = 20; //2 int c = a + b; //3 | int b = 20; //1 int a = 1; //2 int c = a + b; //3 |
结论:编译器调整了语句的顺序,但是不影响程序的最终结果。 | 可以重排序 |
正确使用volatile
关键字
单一赋值可以,但是含复合运算赋值不可以(i++
之类)
volatile int a = 10
volatile boolean flag = false
状态标志,判断业务是否结束
public class Demo05 {
//private static boolean flag = true;
private volatile static boolean flag = true; // 保证其他线程对这个变量的可见性
public static void main(String[] args) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() +" \t 线程进入方法体");
while (flag) {
}
System.out.println(Thread.currentThread().getName() +" \t 线程方法体执行完毕");
},"t1").start();
try {
TimeUnit.SECONDS.sleep(2L);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = false;
System.out.println("主线程执行完毕");
}
}
开销较低的读,写锁策略
/**
* 使用:当读远多于写,结合使用内部锁和 volatile 变量来减少同步的开销
* 理由:利用volatile保证读取操作的可见性;利用synchronized保证复合操作的原子性
*/
public class Counter {
private volatile int value;
public int getValue() {
return value; // 利用volatile保证读取操作的可见性
}
public synchronized int increment() {
return value++; // 利用synchronized保证复合操作的原子性
}
}
DCL
双端锁的发布 即单例模式的懒汉式的加锁双重判断
public class SafeDoubleCheckSingleton {
private static SafeDoubleCheckSingleton singleton;
private SafeDoubleCheckSingleton() {
}
public static SafeDoubleCheckSingleton getInstance() {
if (singleton == null) {
synchronized (SafeDoubleCheckSingleton.class) {
if (singleton == null) {
singleton = new SafeDoubleCheckSingleton();
}
}
}
return singleton;
}
}
最后总结
内存屏障是什么?是一种屏障指令,它使得CPU或编译器对屏障指令的前和后所发出的内存操作执行一个排序的约束,也叫内存栅栏或栅栏指令
凭什么我们java
写了一个volatile
关键字系统底层加入内存屏障?两者关系怎么勾搭上的?
volatile
禁重排
写操作
读操作
一句话总结
volatile
写之前的操作,都禁止重排序到volatile
之后
volatile
读之后的操作,都禁止重排序到volatile
之前
volatile
写 之后volatile读,禁止重排序
七、CAS
多线程环境不使用原子类保证线程安全(基本数据类型)
public class Demo01 {
volatile int cnt = 0;
public int getCnt() {
return cnt;
}
// 使用synchronized 保证线程安全性
public synchronized void setCnt(int cnt) {
this.cnt = cnt;
}
}
多线程环境 使用原子类保证线程安全(基本数据类型)
public class Demo02 {
AtomicInteger atomicInteger = new AtomicInteger();
public int getAtomicInteger() {
return atomicInteger.get();
}
public void setAtomicInteger() {
atomicInteger.getAndIncrement();
}
}
什么是CAS
?
CAS
:compare and swap
,比较并交换,实现并发算法时常用到的一种技术。它包含三个操作数——内存位置、预期原值及更新值。
执行CAS
操作的时候,将内存位置的值与预期原值比较:如果相匹配,那么处理器会自动将该位置值更新为新值,如果不匹配,处理器不做任何操作,多个线程同时执行CAS
操作只有一个会成功。
CAS
是JDK
提供的非阻塞原子性操作,它通过硬件保证了比较-更新的原子性。
它是非阻塞的且自身原子性,也就是说它效率更高且通过硬件保证,说明它更可靠。
CAS
是一条CPU
的原子指令(cmpxchg
指令),不会造成所谓的数据不一致问题,Unsafe
提供的CAS
方法(如compareAndSwapXXX
)底层实现即为CPU指令cmpxchg
。
执行cmpxchg
指令的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,只有一个线程会对总线加锁成功,加锁成功之后会执行CAS
操作,也就是说CAS
的原子性实际上是CPU
实现的, 其实在这一点上还是有排他锁的,只是比起用synchronized
, 这里的排他时间要短的多, 所以在多线程情况下性能会比较好。
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(1);
System.out.println(atomicInteger.compareAndSet(1, 2022) + " \t" + atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(1, 111) + " \t" + atomicInteger.get());
}
CAS
源码分析
compareAndSet()
方法的源代码:
var1
:表示要操作的对象
var2
:表示要操作对象中属性地址的偏移量
var4
:表示需要修改的期望值,也就是被替换的值
var5/var6
:表示需要修改为的新值
// compareAndSet方法
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
// native方法
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
底层原理-Unsafe
类
Unsafe
是CAS
的核心类,由于Java
方法无法直接访问底层系统,需要通过本地(native
)方法来访问,Unsafe
相当于一个后门,基于该类可以直接操作特定内存的数据。Unsafe
类存在于sun.misc
包中,其内部方法操作可以像C
的指针一样直接操作内存,因为Java中CAS
操作的执行依赖于Unsafe
类的方法。 注意Unsafe
类中的所有方法都是native
修饰的,也就是说Unsafe
类中的方法都直接调用操作系统底层资源执行相应任务- 变量
valueOffset
,表示该变量值在内存中的偏移地址,因为Unsafe
就是根据内存偏移地址获取数据的。 - 变量
value
用volatile
修饰,保证了多线程之间的内存可见性。
// compareAndSet方法
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
AtomicInteger
类为什么能保证原子性?
CAS
:它是一条CPU
并发原语。 它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。 AtomicInteger
类主要利用 CAS
(compare and swap
) + volatile
和 native
方法来保证原子操作,从而避免 synchronized
的高开销,执行效率大为提升。
public final int getAndIncrement() {
return unsafe.getAndAddInt方法体(this, valueOffset, 1);
}
// getAndAddInt方法体
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
// compareAndSwapInt方法体
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
假设线程A
和线程B
两个线程同时执行getAndAddInt
操作(分别跑在不同CPU
上):
AtomicInteger
里面的value
原始值为3,即主内存中AtomicInteger
的value
为3,根据JMM
模型,线程A
和线程B
各自持有一份值为3的value
的副本分别到各自的工作内存。- 线程
A
通过getIntVolatile(var1, var2)
拿到value
值3,这时线程A
被挂起。 - 线程
B
也通过getIntVolatile(var1, var2)
方法获取到value
值3,此时刚好线程B没有被挂起并执行compareAndSwapInt
方法比较内存值也为3,成功修改内存值为4,线程B
打完收工,一切OK。 - 这时线程
A
恢复,执行compareAndSwapInt
方法比较,发现自己手里的值数字3和主内存的值数字4不一致,说明该值已经被其它线程抢先一步修改过了,那A
线程本次修改失败,只能重新读取重新来一遍了。 - 线程
A
重新获取value
值,因为变量value
被volatile
修饰,所以其它线程对它的修改,线程A
总是能够看到,线程A
继续执行compareAndSwapInt
进行比较替换,直到成功。
引用类型
@Getter
@ToString
@AllArgsConstructor
class User {
private String userName;
private int age;
}
public class AtomicReferenceDemo {
public static void main(String[] args) {
User z3 = new User("z3", 24);
User li4 = new User("li4", 26);
AtomicReference<User> atomicReferenceUser = new AtomicReference<>();
atomicReferenceUser.set(z3);
System.out.println(atomicReferenceUser.compareAndSet(z3, li4) + "\t" + atomicReferenceUser.get().toString());
System.out.println(atomicReferenceUser.compareAndSet(z3, li4) + "\t" + atomicReferenceUser.get().toString());
}
}
CAS
缺点
- 循环事件长,开销大
ABA
问题:使用version
进行解决
在某一时刻,线程x
和线程y
都获取到了value
为A
的值,此时x
被挂起,y
在运行过程中,先是将A
改变为B
,接着它又将B
改变为A
,此时x
开始操作,value
的内容依然是A
,所以x
可以进行操作并成功修改,虽然这个操作成功了,但这个过程是有问题的。
解决办法:加入版本号,每次操作都记录一个版本号,根据版本号来判断
public class ABADemo {
static AtomicInteger atomicInteger = new AtomicInteger(100);
static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(100, 1);
public static void main(String[] args) {
new Thread(() -> {
atomicInteger.compareAndSet(100, 101);
atomicInteger.compareAndSet(101, 100);
}, "t1").start();
new Thread(() -> {
//暂停一会儿线程
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
;
System.out.println(atomicInteger.compareAndSet(100, 2019) + "\t" + atomicInteger.get());
}, "t2").start();
//暂停一会儿线程,main彻底等待上面的ABA出现演示完成。
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("====================ABA解决办法=====================");
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t 首次版本号:" + stamp);//1
//暂停一会儿线程,
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t 2次版本号:" + atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t 3次版本号:" + atomicStampedReference.getStamp());
}, "t3").start();
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t 首次版本号:" + stamp);//1
//暂停一会儿线程,获得初始值100和初始版本号1,故意暂停3秒钟让t3线程完成一次ABA操作产生问题
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean result = atomicStampedReference.compareAndSet(100, 2019, stamp, stamp + 1);
System.out.println(Thread.currentThread().getName() + "\t" + result + "\t" + atomicStampedReference.getReference());
}, "t4").start();
}
}
总结
你只需要记住:CAS
是靠硬件实现的从而在硬件层面提升效率,最底层还是交给硬件来保证原子性和可见性
实现方式是基于硬件平台的汇编指令,在intel
的CPU
中(X86
机器上),使用的是汇编指令cmpxchg
指令。
核心思想就是:比较要更新变量的值V
和预期值E
(compare
),相等才会将V
的值设为新值N
(swap
)如果不相等自旋再来。
八、原子操作类
共有如下16个原子操作类:下面会进行分组讲解
AtomicBoolean
AtomicInteger
AtomicIntegerArray
AtomicIntegerFieldUpdater
AtomicLong
AtomicLongArray
AtomicLongFieldUpdater
AtomicMarkableReference
AtomicReference
AtomicReferenceArray
AtomicReferenceFieldUpdater
AtomicStampedReference
DoubleAccumulator
DoubleAdder
LongAccumulator
LongAdder
基本类型原子类
AtomicInteger
AtomicBoolean
AtomicLong
常用API
public final int get() // 获取当前的值
public final int getAndSet(int newValue)// 获取当前的值,并设置新的值
public final int getAndIncrement()// 获取当前的值,并自增
public final int getAndDecrement() // 获取当前的值,并自减
public final int getAndAdd(int delta) // 获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) // 如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
案例:注意countDownLatch
类的使用
@Getter
class TestNumber {
private AtomicInteger atomicInteger = new AtomicInteger();
public void add() {
atomicInteger.incrementAndGet();
}
}
public class CASDemo01 {
public static void main(String[] args) throws InterruptedException {
TestNumber testNumber = new TestNumber();
CountDownLatch cnt = new CountDownLatch(100);
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 5000; j++) {
testNumber.add();
}
} finally {
cnt.countDown();
}
}, String.valueOf(i)).start();
}
cnt.await();
System.out.println("最终结果为 => " + testNumber.getAtomicInteger().get());
}
}
数组类型原子类
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
案例
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);
//AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);
//AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1,2,3,4,5});
for (int i = 0; i < atomicIntegerArray.length(); i++) {
System.out.println(atomicIntegerArray.get(i));
}
System.out.println();
System.out.println();
System.out.println();
int tmpInt = 0;
tmpInt = atomicIntegerArray.getAndSet(0, 1122);
System.out.println(tmpInt + "\t" + atomicIntegerArray.get(0));
atomicIntegerArray.getAndIncrement(1);
atomicIntegerArray.getAndIncrement(1);
tmpInt = atomicIntegerArray.getAndIncrement(1);
System.out.println(tmpInt + "\t" + atomicIntegerArray.get(1));
引用类型原子类
AtomicReference
AtomicStampedReference
- 携带版本号的引用类型原子类,可以解决
ABA
问题 - 解决修改过几次
- 状态戳原子引用
AtomicMarkableReference
- 原子更新带有标记位的引用类型对象
- 解决是否修改过 它的定义就是将状态戳简化为
true|false
– 类似一次性筷子
public class CASDemo01 {
public static void main(String[] args) throws InterruptedException {
Test z3 = new Test("z3", 24);
Test li4 = new Test("li4", 26);
AtomicReference<Test> atomicReferenceUser = new AtomicReference<>();
atomicReferenceUser.set(z3);
System.out.println(atomicReferenceUser.compareAndSet(z3, li4) + "\t" + atomicReferenceUser.get().toString());
System.out.println(atomicReferenceUser.compareAndSet(z3, li4) + "\t" + atomicReferenceUser.get().toString());
}
}
@Getter
@ToString
@AllArgsConstructor
class Test {
String userName;
int age;
}
对象的属性修改原子类
以一种线程安全的方式操作非线程安全对象内的某些字段
更新的对象属性必须使用
public volatile
修饰符。因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法
newUpdater()
创建一个更新器,并且需要设置想要更新的类和属性。
AtomicIntegerFieldUpdater
: 原子更新对象中int
类型字段的值
AtomicLongFieldUpdater
: 原子更新对象中Long
类型字段的值
AtomicReferenceFieldUpdater
: 原子更新引用类型字段的值
案例
public class CASDemo01 {
public static void main(String[] args) {
BankAccount bankAccount = new BankAccount();
for (int i = 1; i <= 1000; i++) {
int finalI = i;
new Thread(() -> {
bankAccount.transferMoney(bankAccount);
}, String.valueOf(i)).start();
}
//暂停毫秒
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(bankAccount.money);
}
}
class BankAccount {
private String bankName = "CCB"; // 银行
public volatile int money = 0; // 钱数
AtomicIntegerFieldUpdater<BankAccount> accountAtomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(BankAccount.class, "money");
//不加锁+性能高,局部微创
public void transferMoney(BankAccount bankAccount) {
accountAtomicIntegerFieldUpdater.incrementAndGet(bankAccount);
}
}
AtomicReferenceFieldUpdater
class MyVar {
public volatile Boolean isInit = Boolean.FALSE;
AtomicReferenceFieldUpdater<MyVar, Boolean> atomicReferenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(MyVar.class, Boolean.class, "isInit");
public void init(MyVar myVar) {
if (atomicReferenceFieldUpdater.compareAndSet(myVar, Boolean.FALSE, Boolean.TRUE)) {
System.out.println(Thread.currentThread().getName() + "\t" + "---init.....");
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t" + "---init.....over");
} else {
System.out.println(Thread.currentThread().getName() + "\t" + "------其它线程正在初始化");
}
}
}
/**
* 多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作,要求只能初始化一次
*/
public class AtomicIntegerFieldUpdaterDemo {
public static void main(String[] args) throws InterruptedException {
MyVar myVar = new MyVar();
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
myVar.init(myVar);
}, String.valueOf(i)).start();
}
}
}
原子操作增强类
DoubleAccumulator
DoubleAdder
LongAccumulator
LongAdder
关于阿里巴巴java
开发手册中:volatile解决多线程内存不可见问题,对于一写多读,是可以解决变量同步问题,但是如果多写,同样无法解决线程安全问题。
说明:如果是count++操作,使用如下类实现:AtomicInteger count = new AtomicInteger(); count.addAndGet(1);
如果是JDK8
,推荐使用LongAdder
对象,比AtomicLong
性能更好(减少乐观锁的重试次数)
LongAdder
只能用来计算加法,且从零开始计算
LongAccumulator
提供了自定义的函数操作
使用方法
LongAdder longAdder = new LongAdder();
public void addByLongAdder() {
longAdder.increment();
}
LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 0);
public void addByLongAccumulator() {
longAccumulator.accumulate(1);
}
性能对比
class ClickNumberNet {
int number = 0;
public synchronized void clickBySync() {
number++;
}
AtomicLong atomicLong = new AtomicLong(0);
public void clickByAtomicLong() {
atomicLong.incrementAndGet();
}
LongAdder longAdder = new LongAdder();
public void clickByLongAdder() {
longAdder.increment();
}
LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 0);
public void clickByLongAccumulator() {
longAccumulator.accumulate(1);
}
}
public class CASClassDemo02 {
public static void main(String[] args) throws InterruptedException {
ClickNumberNet clickNumberNet = new ClickNumberNet();
long startTime;
long endTime;
CountDownLatch countDownLatch = new CountDownLatch(50);
CountDownLatch countDownLatch2 = new CountDownLatch(50);
CountDownLatch countDownLatch3 = new CountDownLatch(50);
CountDownLatch countDownLatch4 = new CountDownLatch(50);
startTime = System.currentTimeMillis();
for (int i = 1; i <= 50; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * 10000; j++) {
clickNumberNet.clickBySync();
}
} finally {
countDownLatch.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch.await();
endTime = System.currentTimeMillis();
System.out.println("synchronized 共花费: " + (endTime - startTime) + " 毫秒" + "\t result: " + clickNumberNet.number);
startTime = System.currentTimeMillis();
for (int i = 1; i <= 50; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * 10000; j++) {
clickNumberNet.clickByAtomicLong();
}
} finally {
countDownLatch2.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch2.await();
endTime = System.currentTimeMillis();
System.out.println("AtomicLong 共花费: " + (endTime - startTime) + " 毫秒" + "\t result: " + clickNumberNet.atomicLong);
startTime = System.currentTimeMillis();
for (int i = 1; i <= 50; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * 10000; j++) {
clickNumberNet.clickByLongAdder();
}
} finally {
countDownLatch3.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch3.await();
endTime = System.currentTimeMillis();
System.out.println("LongAdder 共花费: " + (endTime - startTime) + " 毫秒" + "\t result: " + clickNumberNet.longAdder.sum());
startTime = System.currentTimeMillis();
for (int i = 1; i <= 50; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * 10000; j++) {
clickNumberNet.clickByLongAccumulator();
}
} finally {
countDownLatch4.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch4.await();
endTime = System.currentTimeMillis();
System.out.println("LongAccumulator 共花费: " + (endTime - startTime) + " 毫秒" + "\t result: " + clickNumberNet.longAccumulator.longValue());
}
}
对比结果
synchronized 共花费: 2093 毫秒 result: 50000000
AtomicLong 共花费: 417 毫秒 result: 50000000
LongAdder 共花费: 94 毫秒 result: 50000000
LongAccumulator 共花费: 83 毫秒 result: 50000000
源码分析
这段比较难,我思考了很久,觉得单纯的笔记没有老师解释的清楚,建议直接观看视频,反复观看。
简单来讲:LongAdder
在无竞争的情况,跟AtomicLong
一样,对同一个base
进行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间,用一个数组cells
,将一个value
拆分进这个数组cells
。多个线程需要同时对value
进行操作时候,可以对线程id
进行hash
得到hash
值,再根据hash
值映射到这个数组cells
的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells
的所有值和无竞争值base
都加起来作为最终结果。
public void accumulate(long x) {
Cell[] as; long b, v, r; int m; Cell a;
if ((as = cells) != null ||
(r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended =
(r = function.applyAsLong(v = a.value, x)) == v ||
a.cas(v, r)))
longAccumulate(x, function, uncontended);
}
}
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
九、ThreadLocal
简介
ThreadLocal
提供线程局部变量。这些变量与正常的变量不同,因为每一个线程在访问ThreadLocal
实例的时候(通过其get
或set
方法)都有自己的、独立初始化的变量副本。ThreadLocal
实例通常是类中的私有静态字段,使用它的目的是希望将状态(例如,用户ID或事务ID
)与线程关联起来。
案例1
5个销售卖房子,集团高层只关心销售总量的准确统计数,按照总销售额统计,方便集团公司发奖金
class House {
int saleCount = 0;
public synchronized void saleHouse() {
++saleCount;
}
}
public class Demo01 {
public static void main(String[] args) {
House house = new House();
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
int size = new Random().nextInt(5) + 1;
System.out.println(size);
for (int j = 1; j <= size; j++) {
house.saleHouse();
}
}, String.valueOf(i)).start();
}
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t" + "共计卖出多少套:" + house.saleCount);
}
}
案例2
某房产中介销售都有自己的销售额指标,自己专属于自己的,不和别人掺和
阿里巴巴手册:必须回收自定义的
ThreadLocal
变量,尤其在线程池场景下,线程经常会被复用,如果不清理自定义的ThreadLocal
变量,可能会影像后序业务逻辑和造成内存泄露等问题。尽量在代理中使用try-finally
块进行回收。
class House {
int saleCount = 0;
public synchronized void saleHouse() {
++saleCount;
}
//java8之后带来的新写法
ThreadLocal<Integer> saleVolume = ThreadLocal.withInitial(() -> 0);//withInitial当前常被用来初始化
public void saleVolumeByThreadLocal() {
saleVolume.set(1 + saleVolume.get());
}
}
public class Demo01 {
public static void main(String[] args) {
House house = new House();
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
int size = new Random().nextInt(5) + 1;
for (int j = 1; j <= size; j++) {
house.saleHouse();
house.saleVolumeByThreadLocal();
}
System.out.println(Thread.currentThread().getName() + "\t" + "号销售卖出:" + house.saleVolume.get());
}, String.valueOf(i)).start();
}
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t" + "共计卖出多少套:" + house.saleCount);
}
}
根据阿里规范,需要对自定义的ThreadLocal
进行回收,否则容易造成内存泄漏和业务逻辑问题(因为线程池中的线程会复用)
改写案例2
MyData myData = new MyData();
ExecutorService threadPool = Executors.newFixedThreadPool(3);
try {
for(int i = 0;i < 10;i ++){
threadPool.submit(()->{
try {
Integer beforeInt = myData.threadLocalField.get();
myData.add();
Integer afterInt = myData.threadLocalField.get();
System.out.println(Thread.currentThread().getName()+"\t"+"beforeInt"+beforeInt+"\t afterInt"+afterInt);
} finally {
myData.threadLocalField.remove(); // 主动调用remove方法
}
});
}
} catch (Exception e) {
e.printStackTrace();
}finally {
threadPool.shutdown();
}
案例总结
-
因为每个
Thread
内有自己的实例副本并且该副本只由当前线程自己使用 -
既然其他
Thread
不可访问,那就不存在多线程间共享的问题。 -
统一设置初始值,但是每个线程对这个值的修改都是各自线程相互独立的
一句话:如何才能不争抢?
- 假如
synchronized
或者Lock
控制资源的访问顺序 - 利用
ThreadLocal
人手一份,大家各自安好,没必要抢夺
ThreadLocal
源码解析
Thread
,ThreadLocal
,ThreadLocalMap
关系
- 根据官方
API
,Thread
是程序中执行的线程;ThreadLocal
类提供线程局部变量。 - 先打开
Thread.java
类,发现每个Thread
类里面有一个ThreadLocal
类
而ThreadLocalMap
是ThreadLocal
的一个静态内部类
总结:
threadLocalMap
实际上就是一个以threadLocal
实例为key
,任意对象为value
的Entry
对象。
当我们为threadLocal
变量赋值,实际上就是以当前threadLocal
实例为key
,值为value
的Entry
往这个threadLocalMap
中存放
ThreadLocalMap
从字面上就可以看出这是一个保存ThreadLocal
对象的map
(其实是以ThreadLocal
为Key
),不过是经过了两层包装的
ThreadLocal
对象:
- 第一层包装是使用
WeakReference<ThreadLocal<?>>
将ThreadLocal
对象变成一个弱引用的对象。 - 第二层包裝是定义了一个专门的类
Entry
来扩展WeakReference<ThreadLocals?>>
。
java
中的四种引用解析
Reference
是强引用
- 当内存不足,
JVM
开始垃圾回收,对于强引用的对象,就算是出现了OOM
也不会对该对象进行回收,死都不收。 - 强引用是我们最常见的普通对象引用,只要还有强引用指向一个对象,就能表明对象还“活着”,垃圾收集器不会碰这种对象。在
Java
中最常见的就是强引用,把一个对象赋给一个引用变量,这个引用变量就是一个强引用。当一个对象被强引用变量引用时,它处于可达状态,它是不可能被垃圾回收机制回收的,即使该对象以后永远都不会被用到JVM
也不会回收。因此强引用是造成Java
内存泄漏的主要原因之一。 - 对于一个普通的对象,如果没有其他的引用关系,只要超过了引用的作用域或者显式地将相应(强)引用赋值为
null
,一般认为就是可以被垃圾收集的了(当然具体回收时机还是要看垃圾收集策略)。
MyObject myObject = new MyObject();
System.out.println("gc before"+myObject);
myObject = null;//new 一个对象是一个强引用,如果不把他指为null,垃圾回收回收不了他
System.gc();//人工开启gc 一般不用
System.out.println("gc after "+myObject);
SoftReference
是软引用
软引用是一种相对强引用弱化了一些的引用,需要用java.lang.ref.SoftReference
类来实现,可以让对象豁免一些垃圾收集。
对于只有软引用的对象来说,
-
当系统内存充足时它 不会 被回收,
-
当系统内存不足时它 会 被回收。
软引用通常用在对内存敏感的程序中,比如高速缓存就有用到软引用,内存够用的时候就保留,不够用就回收!
在运行案例之前,需要设置JVM
启动参数,至于怎么设置,这里就不作介绍了
参数:-Xms10m -Xmx10m
(如果想要理解这个含义,可以阅读深入理解java
虚拟机第三版)
SoftReference<MyObject> softReference = new SoftReference<>(new MyObject());
System.gc();
try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}
System.out.println("-------gc after内存够用-------"+softReference.get());
try {
byte[] bytes = new byte[20 * 1024 * 1024];
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("---------gc after内存不够-------"+softReference.get());
}
WeakReference
是弱引用
弱引用需要用java.lang.ref.WeakReference
类来实现,它比软引用的生存期更短,
对于只有弱引用的对象来说,只要垃圾回收机制一运行,不管JVM
的内存空间是否足够,都会回收该对象占用的内存。
WeakReference<MyObject> weakReference = new WeakReference<>(new MyObject());
System.out.println("-----gc before 内存够用 "+ weakReference.get());
System.gc();
try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}
//暂停几秒钟线程
System.out.println("----gc after内存够用 "+weakReference.get());
适用场景:
假如有一个应用需要读取大量的本地图片:
- 如果每次读取图片都从硬盘读取则会严重影响性能,
- 如果一次性全部加载到内存中又可能造成内存溢出。
设计思路是:用一个HashMap
来保存图片的路径和相应图片对象关联的软引用之间的映射关系,在内存不足时,JVM
会自动回收这些缓存图片对象所占用的空间,从而有效地避免了OOM
的问题。
Map<String, SoftReference<Bitmap>> imageCache = new HashMap<String, SoftReference<Bitmap>>();
PhantomReference
是虚引用
- 虚引用必须和引用队列 (
ReferenceQueue
)联合使用 - 虚引用需要
java.lang.ret.PhantomReterence
类来实现,顾名思义, 就是形同虚设,与其他几种引用都不同,虚引用并不会决定对象的生命周期。如果一个对象仅持有院引用,那么它就和没有任何引用一样,在任何时候都可能被垃圾回收器回收,它不能单独使用也不能通过它访问对象,虚引用必须和引用队列(ReferenceQueue
)联合使用。 PhantomReference
的get方法总是返回null
,虚引用的主要作用是跟踪对象被垃圾回收的状态。仅仅是提供了一和确保对象被finalize
以后,做某些事情的通知机制。PhantomReference
的get
方法总是返回null
,因此无法访问对应的引用对象。- 处理监控通知使用,换句话说,设置虚引用关联对象的唯一目的,就是在这个对象被收集器回收的时候收到一个系统通知或者后续添加进一步的处理,用来实现比
finalize
机制更灵活的回收操作。
MyObject myObject = new MyObject();
ReferenceQueue<MyObject> referenceQueue = new ReferenceQueue<>();
PhantomReference<MyObject> phantomReference = new PhantomReference<>(myObject, referenceQueue);
// System.out.println(phantomReference.get());//这里就是个null--虚引用的get()就是null
List<byte[]> list = new ArrayList<>();
new Thread(() -> {
while (true)//模拟一个无限循环
{
list.add(new byte[1 * 1024 * 1024]);
try { TimeUnit.MILLISECONDS.sleep(600); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(phantomReference.get());
}
},"t1").start();
new Thread(() -> {
while (true)
{
Reference<? extends MyObject> reference = referenceQueue.poll();
if (reference != null) {
System.out.println("有虚对象加入队列了");
}
}
},"t2").start();
为什么源码中使用弱引用?
当线程t1
方法执行完毕后,栈帧销毁强引用 tl
也就没有了。但此时线程的ThreadLocalMap
里某个entry
的key
引用还指向这个对象
- 若这个key引用是强引用,就会导致key指向的
ThreadLocal
对象及v指向的对象不能被gc
回收,造成内存泄漏; - 若这个
key
引用是弱引用,就大概率会减少内存泄漏的问题。使用弱引用,就可以使ThreadLocal
对象在方法执行完毕后顺利被回收且Entry
的key
引用指向为null
。
线程池线程复用情况下的问题:key
值为null
-
当我们为
threadLocal
变量赋值,实际上就是当前的Entry(threadLocal
实例为key
,值为value
)往这个threadLocalMap
中存放。Entry
中的key
是弱引用,当threadLocal
外部强引用被置为null
(tl=null
),那么系统GC
的时候,根据可达性分析,这个threadLocal
实例就没有任何一条链路能够引用到它,这个ThreadLocal
势必会被回收,这样一来,ThreadLocalMap
中就会出现key
为null
的Entry,就没有办法访问这些key
为null
的Entry
的value
,如果当前线程再迟迟不结束的话(这个tl
就不会被干掉),这些key
为null
的Entry
的value
就会一直存在一条强引用链:Thread Ref -> Thread -> ThreaLocalMap -> Entry -> value
永远无法回收,造成内存泄漏。 -
如果当前
thread
运行结束,threadLocal
,threadLocalMap
,Entry
没有引用链可达,在垃圾回收的时候都会被系统进行回收。 -
但在实际使用中我们有时候会用线程池去维护我们的线程,比如在
Executors.newFixedThreadPool()
时创建线程的时候,为了复用线程是不会结束的,所以threadLocal
内存泄漏就值得我们小心
set
、get
方法会去检查所有键为null
的Entry
对象,通过调用expungeStaleEntry
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// expunge entry at staleSlot
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// Rehash until we encounter null
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;
// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
总结
- 一定要进行初始化避免空指针问题
ThreadLocal.withInitial(()- > 初始化值)
; - 建议把
ThreadLocal
修饰为static
- 用完记得手动
remove
ThreadLocal
并不解决线程间共享数据的问题ThreadLocal
适用于变量在线程间隔离且在方法间共享的场景ThreadLocal
通过隐式的在不同线程内创建独立实例副本避免了实例线程安全的问题- 每个线程持有一个只属于自己的专属
Map
并维护了ThreadLocal
对象与具体实例的映射,该Map
由于只被持有它的线程访问,故不存在线程安全以及锁的问题 ThreadLocalMap
的Entry
对ThreadLocal
的引用为弱引用,避免了ThreadLocal
对象无法被回收的问题都会通过expungeStaleEntry
,cleanSomeSlots
,replaceStaleEntry
这三个方法回收键为null
的Entry
- 群雄逐鹿起纷争,人各一份天下安
十、java
对象内存布局和对象头
在
HotSpot
虚拟机里,对象在堆内存中的存储布局可以划分为三个部分:对象头(Header
)、实例数据(Instance Data
)和对其填充(Padding
)
对象在堆内存中的存储布局
对象内部结构分为:对象头、实例数据、对齐填充(保证8个字节的倍数)。对象头分为对象标记(
markOop
)和类元信息(klassOop
),类元信息存储的是指向该对象类元数据(klass
)的首地址
对象标记Mark Word
在64位系统中,Mark Word
占了8字节,类型指针占了8字节,一共是16字节。
默认存储对象的 HashCode
、分代年龄和锁标志位。这些信息都是与对象自身定义无关的数据,所以Mark Word
被设计成一个非固定的数据结构,以便在极小的空间内存存储尽量多的数据。它会根据对象的状态复用自己的存储空间,也就是说在运行期间Mark Word
里存储的数据会随着锁标志位的变化而变化
GC
年龄采用4位bit存储,最大为15,例如MaxTenuringThreshold
参数默认值就是15, -XX:MaxTenuringThreshold=16
JVM
会启动失败
类元信息(类指针)
所谓的类元信息(类型指针)其实就可以说是模板
对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例。
对象头有多大?在64位系统中Mark Word
占了8个字节,类型指针占了8个字节,一共是16个字节。
实例数据
存放类的属性(
Field
)信息,包括父类的属性信息
对其填充
保证是8字节的倍数:虚拟机要求对象起始地址必须是8字节的整数倍。填充数据不是必须存在的,仅仅是为了字节对齐这部分内存按8字节补充对齐。
下面的类,初始是12字节 + 4字节的
int
+ 1字节的boolean
为 17字节,补充7字节
class User {
private int id;
private boolean status;
}
pom
文件
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.9</version>
</dependency>
输出结果
com.ljq.demo.JUC.ObjectHeaderInfo.User object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) 01 00 00 00 (00000001 00000000 00000000 00000000) (1)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) 43 c1 00 f8 (01000011 11000001 00000000 11111000) (-134168253)
12 4 int User.id 0
16 1 boolean User.status false
17 7 (loss due to the next object alignment)
Instance size: 24 bytes
Space losses: 0 bytes internal + 7 bytes external = 7 bytes total
类型 | 描述 |
---|---|
OFFSET | 偏移量,也就是到这个字段位置所占用的byte 字节 |
SIZE | 后面类型的字节大小 |
TYPE | 是Class 中定义的类型 |
DESCRIPTION | DESCRIPTION 是类型的描述 |
VALUE | VALUE 是TYPE 在内存中的值 |
压缩指针
这也就解释了为什么前面的类型指针是4个字节,节约了内存空间
查看当前JVM
运行参数的指令。-XX:+UseCompressedClassPointers
默认开启了
java -XX:+PrintCommandLineFlags -version
-XX:InitialHeapSize=257799168 -XX:MaxHeapSize=4124786688 -XX:+PrintCommandLineFlags -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:-UseLargePagesIndividualAllocation -XX:+UseParallelGC
开启压缩的对象信息
java.lang.Object object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) 01 00 00 00 (00000001 00000000 00000000 00000000) (1)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
12 4 (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total
-XX:-UseCompressedClassPointers
关闭压缩
public class Demo01 {
public static void main(String[] args) {
User user1 = new User();
Object o = new Object();
System.out.println(ClassLayout.parseInstance(o).toPrintable());
}
}
输出结果
java.lang.Object object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) 01 00 00 00 (00000001 00000000 00000000 00000000) (1)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) 00 1c a6 1c (00000000 00011100 10100110 00011100) (480648192)
12 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
Instance size: 16 bytes
Space losses: 0 bytes internal + 0 bytes external = 0 bytes total
十一、synchronized
与锁升级
用锁会带来性能的下降,无锁能够基于线程并行提升程序性能,但是会带来并发问题。
锁升级的过程:无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁
synchronized
锁:由对象头中的Mark Word
根据锁标志位的不同而被复用及锁升级策略在
java6
之前,只有重量级锁,在并发量少的情况下,会造成用户态和内核态无意义的切换
java
的线程是映射到操作系统原生线程之上的,如果要阻塞或唤醒一个线程就需要操作系统介入,需要在户态与核心态之间切换,这种切换会消耗大量的系统资源,因为用户态与内核态都有各自专用的内存空间,专用的寄存器等,用户态切换至内核态需要传递给许多变量、参数给内核,内核也需要保护好用户态在切换时的一些寄存器值、变量等,以便内核态调用结束后切换回用户态继续工作。
在Java
早期版本中,synchronized
属于重量级锁,效率低下,因为监视器锁(monitor
)是依赖于底层的操作系统的Mutex Lock
来实现的,挂起线程和恢复线程都需要转入内核态去完成,阻塞或唤醒一个Java
线程需要操作系统切换CPU
状态来完成,这种状态切换需要耗费处理器时间,如果同步代码块中内容过于简单,这种切换的时间可能比用户代码执行的时间还长”,时间成本相对较高,这也是为什么早期的synchronized
效率低的原因 Java 6
之后,为了减少获得锁和释放锁所带来的性能消耗,引入了轻量级锁和偏向锁
为什么每个对象都可以成为一个锁?
Monitor
可以理解为一种同步工具,也可理解为一种同步机制,常常被描述为一个Java
对象。Java
对象是天生的Monitor
,每一个Java
对象都有成为Monitor
的潜质,因为在Java
的设计中 ,每一个Java
对象自打娘胎里出来就带了一把看不见的锁,它叫做内部锁或者Monitor
锁。
private static Object lock = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (lock) {
// lock 成为了一把锁
}
},"t1").start();
}
Monitor
(监视器锁)
Monitor
的本质是依赖于底层操作系统的Mutex Lock
实现,操作系统实现线程之间的切换需要从用户态到内核态的转换,成本非常高。
Mutex Lock Monitor
是在jvm
底层实现的,底层代码是c++
。本质是依赖于底层操作系统的Mutex Lock
实现,操作系统实现线程之间的切换需要从用户态到内核态的转换,状态转换需要耗费很多的处理器时间成本非常高。所以synchronized
是Java
语言中的一个重量级操作。
Monitor
与java
对象以及线程是如何关联 ?
注意:以下描述在java6
之前
-
如果一个
java
对象被某个线程锁住,则该java
对象的Mark Word
字段中LockWord
指向monitor
的起始地址 -
Monitor
的Owner
字段会存放拥有相关联对象锁的线程id
Mutex Lock
的切换需要从用户态转换到核心态中,因此状态转换需要耗费很多的处理器时间。
java6
开始,优化synchronized
Java 6
之后,为了减少获得锁和释放锁所带来的性能消耗,引入了轻量级锁和偏向锁
需要有个逐步升级的过程,别一开始就捅到重量级锁
synchronized
锁种类及升级步骤
多线程访问大致分为以下三种情况
- 只有一个线程来访问,有且仅有一个
- 有2个线程交替访问
- 竞争激烈,多个线程同时抢夺
升级流程
synchronized
用的锁是存在Java
对象头里的Mark Word
中锁升级功能主要依赖MarkWord
中锁标志位和释放偏向锁标志位
无锁状态
代码
Object o = new Object();
System.out.println("10进制 hash码:" + o.hashCode());
System.out.println("16进制 hash码:" + Integer.toHexString(o.hashCode()));
System.out.println("2进制 hash码:" + Integer.toBinaryString(o.hashCode()));
System.out.println(ClassLayout.parseInstance(o).toPrintable());
输出
10进制 hash码:1313922862
16进制 hash码:4e50df2e
2进制 hash码:1001110010100001101111100101110
java.lang.Object object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) 01 2e df 50 (00000001 00101110 11011111 01010000) (1356803585)
4 4 (object header) 4e 00 00 00 (01001110 00000000 00000000 00000000) (78)
8 4 (object header) e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
12 4 (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total
这里进行说明:在打印出的Mark Word
信息中,我们是这样进行查看的,如下图所示
可以看到 单8位中最后一段是001,代表的是无锁的状态
而16进制hash
值对应则是从后往前看的结果 4e 50 df 2e
, 请对照下图value
开头部分
2进制hash
同16进制一样
偏向锁
当一段同步代码一直被同一个线程多次访问,由于只有一个线程那么该线程在后续访问时便会自动获得锁
同一个老顾客来访,直接老规矩行方便
Hotspot
的作者经过研究发现,大多数情况下:多线程的情况下,锁不仅不存在多线程竞争,还存在锁由同一线程多次获得的情况,
偏向锁就是在这种情况下出现的,它的出现是为了解决只有在一个线程执行同步时提高性能。
在上面提到的
Mark Word
t头中,有1bit
的偏向锁位,0代表不是偏向锁,1代表是偏向锁
持有偏向锁
理论落地: 在实际应用运行过程中发现,锁总是同一个线程持有,很少发生竞争,也就是说锁总是被第一个占用他的线程拥有,这个线程就是锁的偏向线程。
那么只需要在锁第一次被拥有的时候,记录下偏向线程ID
。这样偏向线程就一直持有着锁(后续这个线程进入和退出这段加了同步锁的代码块时,不需要再次加锁和释放锁。而是直接比较对象头里面是否存储了指向当前线程的偏向锁)。 如果相等表示偏向锁是偏向于当前线程的,就不需要再尝试获得锁了,直到竞争发生才释放锁。以后每次同步,检查锁的偏向线程ID
与当前线程ID
是否一致,如果一致直接进入同步。无需每次加锁解锁都去CAS
更新对象头。如果自始至终使用锁的线程只有一个,很明显偏向锁几乎没有额外开销,性能极高。
假如不一致意味着发生了竞争,锁已经不是总是偏向于同一个线程了,这时候可能需要升级变为轻量级锁,才能保证线程间公平竞争锁。偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程是不会主动释放偏向锁的。
技术实现: 一个synchronized
方法被一个线程抢到了锁时,那这个方法所在的对象就会在其所在的Mark Word
中将偏向锁修改状态位,同时还 会有占用前54
位来存储线程指针作为标识。若该线程再次访问同一个synchronized
方法时,该线程只需去对象头的Mark Word
中去判断一下是否有偏向锁指向本身的ID
,无需再进入 Monitor
去竞争对象了。
偏向锁的操作不用直接捅到操作系统,不涉及用户到内核转换,不必要直接升级为最高级,我们以一个account
对象的“对象头”为例
假如有一个线程执行到synchronized
代码块的时候,JVM
使用CAS
操作把线程指针ID
记录到Mark Word
当中,并修改标偏向标示,标示当前线程就获得该锁。锁对象变成偏向锁(通过CAS
修改对象头里的锁标志位),字面意思是“偏向于第一个获得它的线程”的锁。执行完同步代码块后,线程并不会主动释放偏向锁。
结论:JVM
不用和操作系统协商设置Mutex
(争取内核),它只需要记录下线程ID
就标示自己获得了当前锁,不用操作系统接入。 上述就是偏向锁:在没有其他线程竞争的时候,一直偏向偏心当前线程,当前线程可以一直执行。
JVM
中查看偏向锁配置信息的命令
java -XX:+PrintFlagsInitial | grep BiasedLock*
通过上图中,我们发现实际上偏向锁在1.6之后是默认开启的,但启动时间上是由4秒钟的延迟。
-XX:BiasedLockingStartupDelay=0 // 立即启动
-XX:+UseBiasedLocking // 开启偏向锁
-XX:-UseBiasedLocking // 关闭偏向锁
偏向锁的撤销
当有另外线程逐步来竞争锁的时候,就不能再使用偏向锁了,要升级为轻量级锁
竞争线程尝试
CAS
更新对象头失败,会等待到全局安全点(此时不会执行任何代码)撤销偏向锁。偏向锁使用一种等到竞争出现才释放锁的机制,只有当其他线程竞争锁时,持有偏向锁的原来线程才会被撤销。 撤销需要等待全局安全点(该时间点上没有字节码正在执行),同时检查持有偏向锁的线程是否还在执行:
① 第一个线程正在执行
synchronized
方法(处于同步块),它还没有执行完,其它线程来抢夺,该偏向锁会被取消掉并出现锁升级。 此时轻量级锁由原持有偏向锁的线程持有,继续执行其同步代码,而正在竞争的线程会进入自旋等待获得该轻量级锁。② 第一个线程执行完成
synchronized
方法(退出同步块),则将对象头设置成无锁状态并撤销偏向锁,重新偏向 。
java15
逐步废弃偏向锁
轻量级锁
多线程竞争,但是任意时刻最多只有一个线程竞争,即不存在锁竞争太过激烈的情况,也就没有线程阻寨
有线程来参与锁的竞争,但是获取锁的冲突时间极短
本质就是自选锁
CAS
轻量级锁是为了在线程近乎交替执行同步块时提高性能。 主要目的: 在没有多线程竞争的前提下,通过CAS
减少重量级锁使用操作系统互斥量产生的性能消耗,说白了先自旋再阻塞。 升级时机: 当关闭偏向锁功能或多线程竞争偏向锁会导致偏向锁升级为轻量级锁
假如线程A
已经拿到锁,这时线程B
又来抢该对象的锁,由于该对象的锁已经被线程A
拿到,当前该锁已是偏向锁了。 而线程B
在争抢时发现对象头Mark Word
中的线程ID
不是线程B自己的线程ID
(而是线程A
),那线程B
就会进行CAS
操作希望能获得锁。 此时线程B
操作中有两种情况: 如果锁获取成功,直接替换Mark Word
中的线程ID
为B
自己的ID
(A → B),重新偏向于其他线程(即将偏向锁交给其他线程,相当于当前线程"被"释放了锁),该锁会保持偏向锁状态,A
线程Over
,B
线程上位;
如果锁获取失败,则偏向锁升级为轻量级锁,此时轻量级锁由原持有偏向锁的线程持有,继续执行其同步代码,而正在竞争的线程B
会进入自旋等待获得该轻量级锁。
自旋的次数
自适应,自适应意味着自旋的次数不是固定不变的
而是根据:同一个锁上一次自旋的时间,拥有锁线程的状态来决定。
轻量锁与偏向锁的区别和不同
- 争夺轻量级锁失败时,自旋尝试抢占锁
- 轻量级锁每次退出同步块都需要释放锁,而偏向锁是在竞争发生时才释放锁
重锁
有大量的线程参与锁的竞争,冲突性很高
JIT
编译器对锁的优化
Just In Time Compiler
,一般翻译为即时编译器
锁消除
static Object objectLock = new Object();//正常的
public void m1() {
// 因为每次都是new, 所以每个线程锁住的对象都是不同的,相当于没加锁
// 锁消除,JIT会无视它,synchronized(对象锁)不存在了。不正常的
Object o = new Object();
synchronized (o) {
System.out.println("-----hello " + "\t" + o.hashCode() + "\t" + objectLock.hashCode());
}
}
public static void main(String[] args) {
LockClearUPDemo demo = new LockClearUPDemo();
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
demo.m1();
}, String.valueOf(i)).start();
}
}
锁粗化
static Object objectLock = new Object();
public static void main(String[] args) {
new Thread(() -> {
// 假如方法中首尾相接,前后相邻的都是同一个锁对象,那JIT编译器就会把这几个synchronized块合并成一个大块,
// 加粗加大范围,一次申请锁使用即可,避免次次的申请和释放锁,提升了性能
synchronized (objectLock) {
System.out.println("11111");
}
synchronized (objectLock) {
System.out.println("22222");
}
synchronized (objectLock) {
System.out.println("33333");
}
}, "t1").start();
}
十二、AbstractQueuedSynchronizer
(AQS
)
意味抽象队列同步器,这里的源码比较底层,建议多听老师讲解,我这里只是记录了大概思路,供自己回想。
是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC
体系的基石,通过内置的FIFO
队列来完成资源获取线程的排队工作,并通过一个int
类变量表示持有锁的状态
CLH:Craig、Landin and Hagersten
队列,是一个单向链表,AQS
中的队列是CLH
变体的虚拟双向队列FIFO
ReentrantLock
、CountDownLatch
、ReentrantReadWriteLock
、Semaphore
底层都是通过Sync
继承AbstractQueuedSynchronizer
来实现的。
理解锁和同步器的关系
锁,面向锁的使用者:定义了程序员和锁交互的使用层API
,隐藏了实现细节,你调用即可。
同步器,面向锁的实现者:比如Java
并发大神DougLee
,提出统一规范并简化了锁的实现,屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等
AQS
能干嘛
加锁会导致阻塞,有阻塞就需要排队,实现排队必然需要队列
抢到资源的线程直接使用处理业务,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。
既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH
队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS
的抽象表现。它将请求共享资源的线程封装成队列的结点(Node
),通过CAS
、自旋以及LockSupport.park()
的方式,维护state
变量的状态,使并发达到同步的效果。
有阻塞就需要排队,实现排队必然需要队列
AQS
使用一个volatile
的int
类型的成员变量来表示同步状态,通过内置的FIFO
队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成一个Node
节点来实现锁的分配,通过CAS
完成对State
值的修改。
内部类Node
(Node
类在AQS
类内部)
从ReentrantLock
的原理入手解读AQS
从构造器入手:首先看看非公平锁和公平锁是怎么实现的
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock
中的lock
方法,底层是调用sync
的lock
方法,如果是非公平锁,那么lock
方法如下
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
公平锁的lock
方法如下
final void lock() {
acquire(1);
}
从上面可以看出,acquire
方法是两个lock
中最核心的方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
解读上面的代码,首先是tryAcquire
,此方法是判断当前资源是否是已经抢到锁了
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread(); // 当前线程
int c = getState(); // status的状态
if (c == 0) {// 如果当前资源没人占用,强到锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 如果资源的线程 与 当前线程相等,那么抢到资源
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如果tryAcquire
方法返回了false
,代表其没有抢到当前资源,那么会进入下一个方法addWaiter
,此方法简单来说就是将需要等待的线程加入到队列中
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
加入队列完成之后,进行acquireQueued
方法,此方法主要做的事情就是判断当前队列中的线程,是否能抢到资源,如果不能则进入parkAndCheckInterrupt
方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
parkAndCheckInterrupt
方法中使用LockSupport
来阻塞线程。
LockSupport.park(this); // 阻塞线程
并在外围是一个没有条件for (;;)
循环,来判断当前线程是否能抢夺到资源,亦或者是需要将线程从队列中移除
unlock
主要是通过sync.release(1)
、tryRelease(arg)
、unparkSuccessor
三个方法来实现的
首先是sync.release(1)
,他的底层代码如下
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
根据上述代码可以看到,判断的一句是node
的waitStatus
,简单来说,通过判断队列中node
节点的waitStatus
,来决定是否要释放这个节点。
十三、ReentrantLock
、ReentrantReadWriteLock
、StampedLock
读写锁意义和特点
读写锁ReentrantReadWriteLock
并不是真正意义上的读写分离,它只允许读读共存,而读写和写写依然是互斥的, 大多实际场景是“读/读”线程间并不存在互斥关系,只有"读/写"线程或"写/写"线程间的操作需要互斥的。因此引入ReentrantReadWriteLock
。
一个ReentrantReadWriteLock
同时只能存在一个写锁但是可以存在多个读锁,但不能同时存在写锁和读锁(切菜还是拍蒜选一个)。也即一个资源可以被多个读操作访问或一个写操作访问,但两者不能同时进行。
只有在读多写少情境之下,读写锁才具有较高的性能体现。
下面的代码演示了 读写互斥、读读共享的场景。
class Test {
Map<String, String> map = new HashMap<>();
Lock lock = new ReentrantLock();
ReadWriteLock rwLock = new ReentrantReadWriteLock();
public void write(String key, String value) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 正在写入");
try {
TimeUnit.MICROSECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
public void read(String key) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 正在读取");
String s = map.get(key);
try {
TimeUnit.MICROSECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
public class ReentrantReadWriteLockDemo {
public static void main(String[] args) {
Test test = new Test();
for (int i = 1; i <= 10; i++) {
int finalI = i;
new Thread(() -> {
test.write(finalI + "", finalI + "");
}, String.valueOf(i)).start();
}
for (int i = 1; i <= 10; i++) {
int finalI = i;
new Thread(() -> {
test.read(finalI + "");
}, String.valueOf(i)).start();
}
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//读全部over才可以继续写
for (int i = 1; i <= 3; i++) {
int finalI = i;
new Thread(() -> {
test.write(finalI + "", finalI + "");
}, "writeThread" + String.valueOf(i)).start();
}
}
}
锁降级
简单来说,就是在已经获取到写锁的时候,可以再次获取读锁
案例:
public class LockDownGradingDemo {
public static void main(String[] args) {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
writeLock.lock();
System.out.println("-------正在写入");
readLock.lock();
System.out.println("-------正在读取");
writeLock.unlock();
readLock.unlock();
}
}
写锁和读锁是互斥的
写锁和读锁是互斥的(这里的互斥是指线程间的互斥,当前线程可以获取到写锁又获取到读锁,但是获取到了读锁不能继续获取写锁),这是因为读写锁要保持写操作的可见性。因为,如果允许读锁在被获取的情况下对写锁的获取,那么正在运行的其他读线程无法感知到当前写线程的操作
因此,分析读写锁ReentrantReadWriteLock
,会发现它有个潜在的问题:读锁全完,写锁有望;写锁独占,读写全堵;如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即ReadWriteLock
读的过程中不允许写,只有等待线程都释放了读锁,当前线程才能获取写锁,也就是写入必须等待,这是一种悲观的读锁,o(╥﹏╥)o,人家还在读着那,你先别去写,省的数据乱。
分析StampedLock
(后面详细讲解),会发现它改进之处在于:读的过程中也允许获取写锁介入,这样会导致我们读的数据就可能不一致!所以,需要额外的方法来判断读的过程中是否有写入,这是一种乐观的读锁。 显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。
邮戳锁StampedLock
StampedLock
是JDK1.8
中新增的一个读写锁,也是对JDK1.5
中的读写锁ReentrantReadWriteLock
的优化。
锁饥饿问题: ReentrantReadWriteLock
实现了读写分离,但是一旦读操作比较多的时候,想要获取写锁就变得比较困难了,假如当前1000个线程,999个读,1个写,有可能999个读取线程长时间抢到了锁,那1个写线程就悲剧了 因为当前有可能会一直存在读锁,而无法获得写锁,根本没机会写。
ReentrantReadWriteLock
:允许多个线程同时读,但是只允许一个线程写,在线程获取到写锁的时候,其他写操作和读操作都会处于阻塞状态,读锁和写锁也是互斥的,所以在读的时候是不允许写的,读写锁比传统的synchronized
速度要快很多,原因就是在于ReentrantReadWriteLock
支持读并发
StampedLock
横空出世:ReentrantReadWriteLock
的读锁被占用的时候,其他线程尝试获取写锁的时候会被阻塞。但是,StampedLock
采取乐观获取锁后,其他线程尝试获取写锁时不会被阻塞,这其实是对读锁的优化,所以,在获取乐观读锁后,还需要对结果进行校验
StampedLock
的特点
- 所有获取锁的方法,都返回一个邮戳(
Stamp
),Stamp
为零表示获取失败,其余都表示成功; - 所有释放锁的方法,都需要一个邮戳(
Stamp
),这个Stamp
必须是和成功获取锁时得到的Stamp
一致; StampedLock
是不可重入的,危险(如果一个线程已经持有了写锁,再去获取写锁的话就会造成死锁)
案例演示
public class StampedLockDemo {
static int number = 37;
static StampedLock stampedLock = new StampedLock();
public void write() {
long stamp = stampedLock.writeLock();
System.out.println(Thread.currentThread().getName() + "\t" + "=====写线程准备修改");
try {
number = number + 13;
} catch (Exception e) {
e.printStackTrace();
} finally {
stampedLock.unlockWrite(stamp);
}
System.out.println(Thread.currentThread().getName() + "\t" + "=====写线程结束修改");
}
//悲观读
public void read() {
long stamp = stampedLock.readLock();
System.out.println(Thread.currentThread().getName() + "\t come in readlock block,4 seconds continue...");
//暂停几秒钟线程
for (int i = 0; i < 4; i++) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t 正在读取中......");
}
try {
int result = number;
System.out.println(Thread.currentThread().getName() + "\t" + " 获得成员变量值result:" + result);
System.out.println("写线程没有修改值,因为 stampedLock.readLock()读的时候,不可以写,读写互斥");
} catch (Exception e) {
e.printStackTrace();
} finally {
stampedLock.unlockRead(stamp);
}
}
//乐观读
public void tryOptimisticRead() {
long stamp = stampedLock.tryOptimisticRead();
int result = number;
//间隔4秒钟,我们很乐观的认为没有其他线程修改过number值,实际靠判断。
System.out.println("4秒前stampedLock.validate值(true无修改,false有修改)" + "\t" + stampedLock.validate(stamp));
for (int i = 1; i <= 4; i++) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t 正在读取中......" + i +
"秒后stampedLock.validate值(true无修改,false有修改)" + "\t"
+ stampedLock.validate(stamp));
}
if (!stampedLock.validate(stamp)) {
System.out.println("有人动过--------存在写操作!");
stamp = stampedLock.readLock();
try {
System.out.println("从乐观读 升级为 悲观读");
result = number;
System.out.println("重新悲观读锁通过获取到的成员变量值result:" + result);
} catch (Exception e) {
e.printStackTrace();
} finally {
stampedLock.unlockRead(stamp);
}
}
System.out.println(Thread.currentThread().getName() + "\t finally value: " + result);
}
public static void main(String[] args) {
StampedLockDemo resource = new StampedLockDemo();
new Thread(() -> {
// resource.read(); // 悲观读, 读写互斥
resource.tryOptimisticRead(); // 乐观读,读写不互斥
}, "readThread").start();
// 2秒钟时乐观读失败,6秒钟乐观读取成功resource.tryOptimisticRead();,修改切换演示
//try { TimeUnit.SECONDS.sleep(6); } catch (InterruptedException e) { e.printStackTrace(); }
new Thread(() -> {
resource.write();
}, "writeThread").start();
}
}
StampedLock
的缺点
StampedLock
不支持重入,没有Re
开头StampedLock
的悲观读锁和写锁都不支持条件变量(Condition
),这个也需要注意。- 使用
StampedLock
一定不要调用中断操作,即不要调用interrupt()
方法如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly()
和写锁writeLockInterruptibly()
本文标签: JUC
版权声明:本文标题:JUC编程 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dianzi/1726763049a1083327.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论