admin管理员组

文章数量:1649150

Synchronized是jdk字节码层面的一把可重入,不可中断互斥锁。

要想了解Synchronized,就要先了解java内存模型。

java内存模型

java内存模型分为工作内存和主内存,主内存是存放共享变量的地方,而工作内存是每个线程独有的内存空间。

Java内存模型是一套在多线程读写共享数据时,对共享数据的可见性、有序性、和原子性的规则和保障。

主内存与工作内存之间的数据交互过程

Java内存模型中定义了以下8种操作来完成,主内存与工作内存之间具体的交互协议,即一个变量如何从主内存拷贝到工作内存、如何从工作内存同步回主内存之类的实现细节,虚拟机实现时必须保证下面提及的每一种操作都是原子的、不可再分的。

主内存与工作内存之间的数据交互过程

lock -> read -> load -> use -> assign -> store -> write -> unlock
  1. 如果对一个变量执行lock操作,将会清空工作内存中此变量的值

  2. 对一个变量执行unlock操作之前,必须先把此变量同步到主内存中

Synchronized如何保证三大特性

Synchronized保证原子性

/**
 * 原子性演示 使用五个线程交替累加1到5000
 * @Date 2023/03/27 22:27
 **/
public class Atomicity {
    private static int number = 0;

    public static void main(String[] args) throws InterruptedException {
        Runnable increment = () -> {
            for (int i = 0; i < 1000; i++) {
                number++;
            }
        };

        List<Thread> list = new ArrayList<>();
        //使用五个线程交替累计
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(increment);
            thread.start();
            list.add(thread);
        }
		
        //线程同步
        for (Thread thread : list) {
            thread.join();
        }

        System.out.println(number);
    }
}

多次运行这段代码,可以看到运行结果有可能小于5000。

这是因为多线程在运行时,每个线程的工作内存中的共享变量number的副本数据值同步不及时导致的:线程1获取到number的值为1000,然后对1000++,此时工作内存中的值仍然没有正式更行到住内存中,这时线程2又对1000++,导致两次++自增操作都是对1000 + 1 = 1001,而不是1000 + 1 + 1 = 1002

此时我们使用Synchronized对代码块进行加锁,再多次运行,观察运行结果

import java.util.ArrayList;
import java.util.List;

/**
 * 原子性演示 使用五个线程交替累加到5000
 * @Date 2023/03/27 22:27
 **/
public class Atomicity {

    private static int number = 0;

    private static Object object = new Object();

    public static void main(String[] args) throws InterruptedException {
        Runnable increment = () -> {
            for (int i = 0; i < 1000; i++) {
                //获取锁
                synchronized (object) {
                    number++;
                }
            }
        };

        List<Thread> list = new ArrayList<>();
        //使用五个线程交替累加
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(increment);
            thread.start();
            list.add(thread);
        }

        //线程同步
        for (Thread thread : list) {
            thread.join();
        }

        System.out.println(number);
    }
}

Synchronized保证原子性的原理

synchronized在对代码加锁后,保证同一时刻只有一个线程对number进行操作,就不会向之前那样因为同一时刻有多个线程对number进行操作而导致数据不一致,有重复累加的情况。

Synchronized保证可见性

案例演示:一个线程根据boolean类型的标记flflag, while循环,另一个线程改变这个flflag变量的值,另一个线程并不会停止循环。

/**
 * 演示可见性
 * @Date 2023/03/27 22:47
 **/
public class Visibility {

    private static boolean flag = true;

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            while (flag) {

            }
        }).start();

        Thread.sleep(2000);

        //在flag被更改后,观察上面的死循环会不会停止
        new Thread(() -> {
            flag = false;
            System.out.println("flag值被更改");
        }).start();
    }
}

一个线程根据boolean类型的标记flag, while循环,线程2改变这个flag变量的值,线程1并不会停止循环。

这是因为线程2更改了自己工作内存的flag值并将flag刷新到主内存后,线程1的工作内存仍然一直使用的是flag未被改变的值(原来的值),没有主动去主内存中获取flag的最新值,主内存也没有强制使线程1工作内存中的副本失效,线程1就一直这样执行下去。

此时我们使用Synchronized对代码块进行加锁,再多次运行,观察运行结果

/**
 * 演示可见性
 * @Date 2023/03/27 22:47
 **/
public class Visibility {

    /**
     * 共享变量
     */
    private static boolean flag = true;

    private static Object object = new Object();

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            while (flag) {
                synchronized (object) {
                    
                }
            }
        }).start();

        Thread.sleep(2000);

        //在flag被更改后,观察上面的死循环会不会停止
        new Thread(() -> {
            flag = false;
            System.out.println("flag值被更改");
        }).start();
    }
}

我们可以看到,当我们使用Synchronized对代码加锁,flag被更改为false后,死循环会自动停止。

Synchronized保证可见性的原理

使用Synchronized加锁以后,实际上是对主内存中的flag值进行加锁,每次线程1运行代码,都会将自己原来工作内存中的数据进行清空,然后将主内存中的最新数据读取加载到工作内存中,所以加锁以后每次循环的flag值都是主内存中的最新值,避免了因为主内存和工作内存中的数据不一致导致的继续死循环。

Synchronized保证有序性

java为什么会存在指令重排?

编译器和cpu会对代码的执行顺序进行重新编排,来提高java代码的执行效率。

as-if-serial语义

as-if-serial保证了java代码不会因为指令重排而得到不同的结果

下列情况,不能使用重排:

写后读

int a = 1;
int b = a;

读后写

int a = 1;
int b = a;
int a = 2;

写后写

int a = 2;
a = 3;
/**
 * 有序性演示
 * @Date 2023/03/28 00:18
 **/
public class Ordering {

    private static int num = 0;

    private static boolean ready = true;
    
    private static Object object = new Object();


    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            while (ready) {
                num = ++num;
            }
            System.out.println(num);
        }).start();

        Thread.sleep(2000);
        new Thread(() -> {
            synchronized (object) {
                ready = false;
                num = 0;
            }
        });
    }
}

多次运行以上代码,我们可以看到虽然我们无法控制指令重排,但是synchronized可以保证同一时刻只有一个线程访问同步代码块,所以synchronized可以保证线程执行的有序性,但是synchronized无法禁止指令重排。

Synchronized的特性

可重入特性

什么是可重入?

同一个线程可以多次获取同一个锁对象

/**
 * 可重入演示
 *
 * @Date 2023/03/29 21:57
 **/
public class RLock {


    private static Object object = new Object();

    public static void main(String[] args) {
        testRlock();
    }

    private static void testRlock() {
        //看是否同一个线程是否可以多次获取锁
        synchronized (object) {
            System.out.println("获取锁对象1 线程:" + Thread.currentThread().getId());
            synchronized (object) {
                System.out.println("获取锁对象2 线程:" + Thread.currentThread().getId());
            }
        }
    }
}

执行结果:

获取锁对象1 线程:1
获取锁对象2 线程:1

Process finished with exit code 0

通过运行结果我们可以看到,同一个线程可以多次获取同一个锁对象

可重入原理

Synchronized锁对象的监视器对象内部维护了一个变量recursion来记录同一个线程获取该监视器对象的次数

可重入的好处

  1. 可以有效避免因为锁竞争所导致的死锁(不是完全解决)
  2. 可以让我们更好的封装代码

不可中断特性

ReentrantLock可中断演示

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * ReentrantLock可中断演示
 *
 * @Date 2023/03/29 22:21
 **/
public class ReenLock {

    private final static Lock lock = new ReentrantLock();

    public static void main(String[] args) throws InterruptedException {
        Runnable run = () -> {
            String threadName = Thread.currentThread().getName();
            try {
                boolean flag = lock.tryLock(3, TimeUnit.SECONDS);
                if (flag) {
                    System.out.println(threadName + "获得锁");
                    Thread.sleep(10000);
                    lock.unlock();
                } else {
                    System.out.println(threadName + "获取锁失败");
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        };

        Thread thread1 = new Thread(run);
        //开启一个线程来执行代码块
        thread1.start();

        //主线程先休眠1秒 再启动线程2
        Thread.sleep(1000);

        //启动线程2
        Thread thread2 = new Thread(run);
        thread2.start();

        System.out.println("停止线程2前");
        thread2.interrupt();
        System.out.println("停止线程2后");

        Thread.sleep(1000);
        System.out.println("Thread1状态: " + thread1.getState());
        System.out.println("Thread2状态: " + thread2.getState());
    }
}

运行结果

Thread1状态: TIMED_WAITING
Thread2状态: TERMINATED

synchronized在锁竞争时是不可中断的,获取不到锁的线程会一直处于blocked(阻塞)状态

/**
 * 掩饰synchronized不可中断
 *
 * @Date 2023/03/29 22:08
 **/
public class Block {

    private final static Object object = new Object();

    public static void main(String[] args) throws InterruptedException {
        Runnable run = () -> {
            synchronized (object) {
                System.out.println("进入同步代码块");

                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        };

        Thread thread1 = new Thread(run);
        Thread thread2 = new Thread(run);
        //开启一个线程来执行代码块
        thread1.start();

        //主线程先休眠1秒 再启动线程2
        Thread.sleep(1000);

        //启动线程2
        thread2.start();

        System.out.println("停止线程2前");
        thread2.interrupt();
        System.out.println("停止线程2后");

        System.out.println("Thread1状态: " + thread1.getState());
        System.out.println("Thread2状态: " + thread2.getState());
    }
}

运行结果

进入同步代码块
限制线程2前
限制线程2后
Thread1状态: TIMED_WAITING
Thread1状态: BLOCKED
进入同步代码块

我们可以看到,synchronized获取锁失败阻塞状态的线程不可以使用interrupt()进行中断操作,而ReentrantLock获取锁失败可以被interrupt()进行中断操作

小结

不可中断是指,当一个线程获得锁后,另一个线程一直处于阻塞或等待状态,前一个线程不释放锁,后一个线程会一直阻塞或等待,不可被中断。

synchronized属于不可被中断

Lock的lock方法是不可中断的

Lock的tryLock方法是可中断的

Synchronized原理

javap 反汇编

我们编写一个简单的synchronized代码,如下:

/**
 * 演示可见性
 * @Date 2023/03/27 22:47
 **/
public class Visibility {

    /**
     * 共享变量
     */
    private static boolean flag = true;

    private final static Object object = new Object();

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            while (flag) {
                synchronized (object) {

                }
            }
        }).start();

        Thread.sleep(2000);

        //在flag被更改后,观察上面的死循环会不会停止
        new Thread(() -> {
            flag = false;
            System.out.println("flag值被更改");
        }).start();
    }
}

我们要看synchronized的原理,但是synchronized是一个关键字,看不到源码。我们可以将class文件进行反汇编。

JDK自带的一个工具: javap ,对字节码进行反汇编,查看字节码指令。

在DOS命令行输入:

javap -p -v -c Visibility.class

反汇编后的效果如下:

 private static void lambda$main$0();
    descriptor: ()V
    flags: ACC_PRIVATE, ACC_STATIC, ACC_SYNTHETIC
    Code:
      stack=2, locals=2, args_size=0
         0: getstatic     #10                 // Field flag:Z
         3: ifeq          25
         6: getstatic     #14                 // Field object:Ljava/lang/Object;
         9: dup
        10: astore_0
        11: monitorenter   //进入监视器         
        12: aload_0
        13: monitorexit    //释放监视器
        14: goto          22
        17: astore_1
        18: aload_0
        19: monitorexit
        20: aload_1
        21: athrow
        22: goto          0
        25: return
      Exception table:
         from    to  target type
            12    14    17   any
            17    20    17   any
      LineNumberTable:
        line 16: 0
        line 17: 6
        line 19: 12
        line 21: 25

我们看到有个monitorenter指令和monitorexit指令

monitorenter

首先我们来看一下JVM规范中对于monitorenter的描述:

https://docs.oracle/javase/specs/jvms/se8/html/jvms-6.html#jvms-6.5.monitorenter

每一个java对象都有一个monitor监视器对象存储该对象的对象头中,monitor被占用时,监视器被占用时会被锁住,其他线程无法来获

取该monitor。 当JVM执行某个线程的某个方法内部的monitorenter时,它会尝试去获取当前对象对应的monitor的所有权,monitor中维护了两个值,一个是owner,一个是recursion,owner代表拥有该锁对象的线程,recursion代表获取该锁的次数。其过程如下:

  1. 若recursion的进入数为0,那么代表给锁对象没有被其他线程持有,线程可以获取该锁,并将owner设置为自己,然后将recursion设置为1,当前线程成为锁的持有者
  2. 若recursion不为0,那么就去判断owner是否等于当前线程,若是,那么属于重入,将recursion+1
  3. 若都不满足1、2条件,那么代表该锁已经被其他线程持持有,当前线程只能够一直处于阻塞状态,只有recursion的值为0,才能重新尝试获取锁

monitorexit

首先我们来看一下JVM规范中对于monitorexit的描述:

  1. 能执行monitorexit指令的线程一定是持有锁的线程
  2. 执行一次monitorexit,就将recursion的值减1,若recursion的值为0,那么代表该锁被释放,其他线程可以来尝试获取该锁

monitorexit插入在方法结束处和异常处,JVM保证每个monitorenter必须有对应的monitorexit,这也就是为什么发生异常时,Synchronized会自动释放锁的原因

上面的都是同步代码块,那么同步方法是怎么加锁的呢?

我们使用javap命令反编译以下代码

public static void main(String[] args) throws InterruptedException {
        test();
    }

    public synchronized static void test() throws InterruptedException {
        new Thread(() -> {
            while (flag) {
                System.out.println("获取到锁");
            }
        }).start();

        Thread.sleep(2000);

        //在flag被更改后,观察上面的死循环会不会停止
        new Thread(() -> {
            flag = false;
            System.out.println("flag值被更改");
        }).start();
    }

得到以下结果

  private static void lambda$test$0();
    descriptor: ()V
    flags: ACC_PRIVATE, ACC_STATIC, ACC_SYNTHETIC  #// 隐式加monitorenter monitorexit
    Code:
      stack=2, locals=0, args_size=0
         0: getstatic     #11                 // Field flag:Z
         3: ifeq          17
         6: getstatic     #12                 // Field java/lang/System.out:Ljava/io/PrintStream;
         9: ldc           #15                 // String 获取到锁
        11: invokevirtual #14                 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
        14: goto          0
        17: return
      LineNumberTable:
        line 20: 0
        line 21: 6
        line 23: 17
      StackMapTable: number_of_entries = 2
        frame_type = 0 /* same */
        frame_type = 16 /* same */

  static {};
    descriptor: ()V
    flags: ACC_STATIC
    Code:
      stack=2, locals=0, args_size=0
         0: iconst_1
         1: putstatic     #11                 // Field flag:Z
         4: new           #16                 // class java/lang/Object
         7: dup
         8: invokespecial #1                  // Method java/lang/Object."<init>":()V
        11: putstatic     #17                 // Field object:Ljava/lang/Object;
        14: return
      LineNumberTable:
        line 10: 0
        line 12: 4
}

虽然方法体中没有monitorenter monitorexit,但是可以看到同步方法在反汇编后,会增加 ACC_SYNCHRONIZED 修饰。会隐式调用monitorenter和monitorexit。在执行同步方法前会调用monitorenter,在执行完同步方法后会调用monitorexit。最后都能达到加锁的效果。

synchronized与Lock区别

  1. synchronized是关键字,而Lock是一个接口。
  2. synchronized会自动释放锁,而Lock必须手动释放锁。
  3. synchronized是不可中断的,Lock可以中断也可以不中断。
  4. 通过Lock可以知道线程有没有拿到锁,而synchronized不能。
  5. synchronized能锁住方法和代码块,而Lock只能锁住代码块。
  6. Lock可以使用读锁提高多线程读效率。
  7. synchronized是非公平锁,ReentrantLock可以控制是否是公平锁。

深入JVM源码

JVM源码下载

http://openjdk.java/ --> Mercurial --> jdk8 --> hotspot --> zip

monitor监视器锁

可以看出无论是synchronized代码块还是synchronized方法,其线程安全的语义实现最终依赖一个叫monitor的东西,那么这个神秘的东西是什么呢?下面让我们来详细介绍一下。

在HotSpot虚拟机中,monitor是由ObjectMonitor实现的。其源码是用c++来实现的,位于HotSpot虚拟机源码ObjectMonitor.hpp文件中(src/share/vm/runtime/objectMonitor.hpp)。ObjectMonitor主要数据结构如下:

  ObjectMonitor() {
    _header       = NULL;
    _count        = 0;
    _waiters      = 0,
    _recursions   = 0;      //线程重入次数
    _object       = NULL;   //指向对应的java对象
    _owner        = NULL;   //拥有该monitor的线程
    _WaitSet      = NULL;   //处于wait状态的线,会被加入到这个队列中
    _WaitSetLock  = 0 ;
    _Responsible  = NULL ;
    _succ         = NULL ;
    _cxq          = NULL ;  //竞争锁的线程会被加入到这个队列中
    FreeNext      = NULL ;
    _EntryList    = NULL ;  //处于blocked阻塞状态的线程,会被加入到这个队列中
    _SpinFreq     = 0 ;
    _SpinClock    = 0 ;
    OwnerIsThread = 0 ;
    _previous_owner_tid = 0;
  }
  1. _owner:初始值为null。当有线程占有该monitor时,owner标记为该线程的唯一标识。当线程释放monitor时,owner又恢复为nunn,owner是一个临界资源,jvm是通过CAS操作来保证其线程安全的
  2. _cxq:竞争队列,所有请求锁的线程首先会被放在这个队列中(单向链表)。_cxq是一个临界资源,jvm通过CAS原子指令来修改_cxq队列。修改前cxq的旧值填入了node的next字段,cxq指向新值。因此,cxq是一个先进后出的stack(栈)。
  3. _EntryList:cxq队列中有资格成为候选资源的线程会被移动到该队列中,是一个专门存储blocked阻塞状态线程的队列。
  4. _WaitSet:因为调用wait方法而被阻塞的线程会被放在该队列中。

每一个Java对象都可以与一个监视器monitor关联,我们可以把它理解成为一把锁,当一个线程想要执行一段被synchronized圈起来的同步方法或者代码块时,该线程得先获取到synchronized修饰的对象对应的monitor。我们的Java代码里不会显示地去创造这么一个monitor对象,我们也无需创建,事实上可以这么理解:monitor并不是随着对象创建而创建的。我们是通过synchronized修饰符告诉JVM需要为我们的某个对象创建关联的monitor对象。每个线程都存在两个ObjectMonitor对象列表,分别为free和used列表。同时JVM中也维护着global locklist。当线程需要ObjectMonitor对象时,首先从线程自身的free表中申请,若存在则使用,若不存在则从global list中申请。

ObjectMonitor的数据结构中包含:owner、_WaitSet和_EntryList,它们之间的关系转换可以用下图表示:

monitor竞争

  1. 执行monitorenter时,会调用InterpreterRuntime.cpp

    位于:src/share/vm/interpreter/interpreterRuntime.cpp) 的 InterpreterRuntime::monitorenter函数。具体代码可参见HotSpot源码。

    IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
    #ifdef ASSERT
      thread->last_frame().interpreter_frame_verify_monitor(elem);
    #endif
      if (PrintBiasedLockingStatistics) {
        Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
      }
      Handle h_obj(thread, elem->obj());
      assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
             "must be NULL or an object");
      //偏向锁
      if (UseBiasedLocking) {
        // Retry fast entry if bias is revoked to avoid unnecessary inflation
        ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
      } else {
          //重量级锁
        ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
      }
      assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),
             "must be NULL or an object");
    #ifdef ASSERT
      thread->last_frame().interpreter_frame_verify_monitor(elem);
    #endif
    IRT_END
    
  2. 对于重量级锁,monitorenter函数中会调用 ObjectSynchronizer::slow_enter

  3. 最终调用 ObjectMonitor::enter(位于:src/share/vm/runtime/objectMonitor.cpp),源码如下:

    void ATTR ObjectMonitor::enter(TRAPS) {
      // The following code is ordered to check the most common cases first
      // and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.
      Thread * const Self = THREAD ;
      void * cur ;
    
      //通过系统内核函数将owner赋值为当前线程 并返回之前占用锁线程
      cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
      if (cur == NULL) {
         // Either ASSERT _recursions == 0 or explicitly set _recursions = 0.
         assert (_recursions == 0   , "invariant") ;
         assert (_owner      == Self, "invariant") ;
         // CONSIDER: set or assert OwnerIsThread == 1
         return ;
      }
      
      //若之前占用锁线程 == 当前线程 则代表是重入 将crecursions++ 然后return
      if (cur == Self) {
         // TODO-FIXME: check for integer overflow!  BUGID 6557169.
         _recursions ++ ;
         return ;
      }
    
      //若当前线程是第一次进入进入monitor,则将当前recursion赋值为1 将owner赋值为当前线程
      if (Self->is_lock_owned ((address)cur)) {
        assert (_recursions == 0, "internal state error");
        _recursions = 1 ;
        // Commute owner from a thread-specific on-stack BasicLockObject address to
        // a full-fledged "Thread *".
        _owner = Self ;
        OwnerIsThread = 1 ;
        return ;
      }
    
      //<-------------------- owner设置失败 -------------------->
        //使用死循环,将当前线程设置到cxq队列中,然后使用自旋的方式尝试获取锁,若还是获取锁失败,则将当前线程挂起
        for (;;) {
          jt->set_suspend_equivalent();
    
          EnterI (THREAD) ;
    
          if (!ExitSuspendEquivalent(jt)) break ;
    
              _recursions = 0 ;
          _succ = NULL ;
          exit (false, Self) ;
    
          jt->java_suspend_self();
        }
        Self->set_current_pending_monitor(NULL);
      }
    

    以上代码的具体流程概括如下:

    1. 使用CAS的方式将monitor的owner设置为当前线程
    2. 若设置之前的线程等于当前线程,则代表是重入,将recursions++
    3. 若当前线程是第一次进入monitor,那么将owner设置为当前线程,然后将recursion赋值为1
    4. 若当前线程设置owner失败,将当前线程挂起,等待monitor释放时被唤醒重新竞争monitor

monitor等待

竞争失败等待调用的是ObjectMonitor对象的EnterI方法

(位于:src/share/vm/runtime/objectMonitor.cpp),源码如下所示:

void ATTR ObjectMonitor::EnterI (TRAPS) {
    Thread * Self = THREAD ;
    assert (Self->is_Java_thread(), "invariant") ;
    assert (((JavaThread *) Self)->thread_state() == _thread_blocked   , "invariant") ;

    //TryLock尝试获取锁
    if (TryLock (Self) > 0) {
        assert (_succ != Self              , "invariant") ;
        assert (_owner == Self             , "invariant") ;
        assert (_Responsible != Self       , "invariant") ;
        return ;
    }

    DeferredInitialize () ;

    //自旋获取锁
    if (TrySpin (Self) > 0) {
        assert (_owner == Self        , "invariant") ;
        assert (_succ != Self         , "invariant") ;
        assert (_Responsible != Self  , "invariant") ;
        return ;
    }

    // The Spin failed -- Enqueue and park the thread ...
    assert (_succ  != Self            , "invariant") ;
    assert (_owner != Self            , "invariant") ;
    assert (_Responsible != Self      , "invariant") ;

    //将当前线程封装成objectMonitor类型节点 并将当前node类型设置为TS_CXQ
    ObjectWaiter node(Self) ;
    Self->_ParkEvent->reset() ;
    node._prev   = (ObjectWaiter *) 0xBAD ;
    node.TState  = ObjectWaiter::TS_CXQ ;

    //使用死循环+CAS的方式将当初线程加入_cxq队列中
    ObjectWaiter * nxt ;
    for (;;) {
        node._next = nxt = _cxq ;
        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

        //再次tryLock获取锁
        if (TryLock (Self) > 0) {
            assert (_succ != Self         , "invariant") ;
            assert (_owner == Self        , "invariant") ;
            assert (_Responsible != Self  , "invariant") ;
            return ;
        }
    }

    for (;;) {
        //再次尝试获取锁
        if (TryLock (Self) > 0) break ;
        assert (_owner != Self, "invariant") ;

        if ((SyncFlags & 2) && _Responsible == NULL) {
           Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
        }

        //将当前线程挂起
        if (_Responsible == Self || (SyncFlags & 1)) {
            TEVENT (Inflated enter - park TIMED) ;
            Self->_ParkEvent->park ((jlong) RecheckInterval) ;
            // Increase the RecheckInterval, but clamp the value.
            RecheckInterval *= 8 ;
            if (RecheckInterval > 1000) RecheckInterval = 1000 ;
        } else {
            TEVENT (Inflated enter - park UNTIMED) ;
            Self->_ParkEvent->park() ;
        }

        //被唤醒成功后,尝试获取锁
        if (TryLock(Self) > 0) break ;

        TEVENT (Inflated enter - Futile wakeup) ;
        if (ObjectMonitor::_sync_FutileWakeups != NULL) {
           ObjectMonitor::_sync_FutileWakeups->inc() ;
        }
        ++ nWakeups ;
        //尝试自旋获取锁
        if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ;



        if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {
           Self->_ParkEvent->reset() ;
           OrderAccess::fence() ;
        }
        if (_succ == Self) _succ = NULL ;

        OrderAccess::fence() ;
    }

当该线程被唤醒时,会从挂起的点继续执行,通过 ObjectMonitor::TryLock 尝试获取锁,TryLock方法实现如下:

int ObjectMonitor::TryLock (Thread * Self) {
   for (;;) {
      void * own = _owner ;
      if (own != NULL) return 0 ;
      if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {
         // Either guarantee _recursions == 0 or set _recursions = 0.
         assert (_recursions == 0, "invariant") ;
         assert (_owner == Self, "invariant") ;
         // CONSIDER: set or assert that OwnerIsThread == 1
         return 1 ;
      }
      // The lock had been free momentarily, but we lost the race to the lock.
      // Interference -- the CAS failed.
      // We can either return -1 or retry.
      // Retry doesn't make as much sense because the lock was just acquired.
      if (true) return -1 ;
   }
}

以上代码的具体流程概括如下:

  1. 通过自旋的方式获取锁
  2. 获取锁失败,将当前线程封装成objectWaiter类型的node,并当node状态设置为TS_CXQ,存到_cxq队列
  3. 一边自旋尝试获取锁,一边尝试将当前线程park挂起,直到获取锁成功或挂起成功
  4. 当线程被唤醒时,会从挂起的以自旋的方式使用tryLock()方法获取锁

monitor释放

当某个持有锁的线程执行完同步代码块时,会进行锁的释放,给其它线程机会执行同步代码,在HotSpot中,通过退出monitor的方式实现锁的释放,并通知被阻塞的线程。具体实现位于ObjectMonitor的exit方法中。

(位于:src/share/vm/runtime/objectMonitor.cpp),源码如下所示:

void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
   Thread * Self = THREAD ;

   //若_recursions != 0,代表仍然存在重入,需要等待_recursions完全释放(_recursions == 0)
   if (_recursions != 0) {
     _recursions--;        // this is simple recursive enter
     TEVENT (Inflated exit - recursive) ;
     return ;
   }

   for (;;) {
      assert (THREAD == _owner, "invariant") ;

      ObjectWaiter * w = NULL ;
      //获取唤醒模式
      int QMode = Knob_QMode ;
      //直接对_cxq队列中的线程进行唤醒
      if (QMode == 2 && _cxq != NULL) {
          w = _cxq ;
          assert (w != NULL, "invariant") ;
          assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }

      //将_cxq队列中的node追加到_EntryList队列的尾部 然后_EntryList队列对进行唤醒
      if (QMode == 3 && _cxq != NULL) {
          w = _cxq ;
          for (;;) {
             assert (w != NULL, "Invariant") ;
             ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
             if (u == w) break ;
             w = u ;
          }
          assert (w != NULL              , "invariant") ;

          ObjectWaiter * q = NULL ;
          ObjectWaiter * p ;
          for (p = w ; p != NULL ; p = p->_next) {
              guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
              p->TState = ObjectWaiter::TS_ENTER ;
              p->_prev = q ;
              q = p ;
          }

          // Append the RATs to the EntryList
          // TODO: organize EntryList as a CDLL so we can locate the tail in constant-time.
          ObjectWaiter * Tail ;
          for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
          if (Tail == NULL) {
              _EntryList = w ;
          } else {
              Tail->_next = w ;
              w->_prev = Tail ;
          }

          // Fall thru into code that tries to wake a successor from EntryList
      }

       //将_cxq队列中的node插入到_EntryList队列的头部 然后_EntryList队列对进行唤醒
      if (QMode == 4 && _cxq != NULL) {
          w = _cxq ;
          for (;;) {
             assert (w != NULL, "Invariant") ;
             ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
             if (u == w) break ;
             w = u ;
          }
          assert (w != NULL              , "invariant") ;

          ObjectWaiter * q = NULL ;
          ObjectWaiter * p ;
          for (p = w ; p != NULL ; p = p->_next) {
              guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
              p->TState = ObjectWaiter::TS_ENTER ;
              p->_prev = q ;
              q = p ;
          }

          // Prepend the RATs to the EntryList
          if (_EntryList != NULL) {
              q->_next = _EntryList ;
              _EntryList->_prev = q ;
          }
          _EntryList = w ;

          // Fall thru into code that tries to wake a successor from EntryList
      }

      w = _EntryList  ;
      if (w != NULL) {
          assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }
      w = _cxq ;
      if (w == NULL) continue ;
      for (;;) {
          assert (w != NULL, "Invariant") ;
          ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
          if (u == w) break ;
          w = u ;
      }
      TEVENT (Inflated exit - drain cxq into EntryList) ;

      assert (w != NULL              , "invariant") ;
      assert (_EntryList  == NULL    , "invariant") ;
      
      //把_cxq倾倒入_EntryList 然后逆序排列
      if (QMode == 1) {
         ObjectWaiter * s = NULL ;
         ObjectWaiter * t = w ;
         ObjectWaiter * u = NULL ;
         while (t != NULL) {
             guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
             t->TState = ObjectWaiter::TS_ENTER ;
             u = t->_next ;
             t->_prev = u ;
             t->_next = s ;
             s = t;
             t = u ;
         }
         _EntryList  = s ;
         assert (s != NULL, "invariant") ;
      } else {
         // QMode == 0 or QMode == 2
         _EntryList = w ;
         ObjectWaiter * q = NULL ;
         ObjectWaiter * p ;
         for (p = w ; p != NULL ; p = p->_next) {
             guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
             p->TState = ObjectWaiter::TS_ENTER ;
             p->_prev = q ;
             q = p ;
         }
      }
      
      if (_succ != NULL) continue;

      w = _EntryList  ;
      if (w != NULL) {
          guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          //进行唤醒操作
          ExitEpilog (Self, w) ;
          return ;
      }
   }
}

以上代码的具体流程概括如下:

  1. 退出同步方法(同步代码块),会让recursion - 1,当recursion的值为0时,说明释放了monitor
  2. 根据不同的策略(由QMode)指定,从cxq或EntryList中获取头节点,通过ObjectMonitor::ExitEpilog方法进行唤醒线程,唤醒操作最终由unpark完成,实现如下:
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
   assert (_owner == Self, "invariant") ;


   _succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
   ParkEvent * Trigger = Wakee->_event ;

   Wakee  = NULL ;

   // Drop the lock
   OrderAccess::release_store_ptr (&_owner, NULL) ;
   OrderAccess::fence() ;                               // ST _owner vs LD in unpark()

   if (SafepointSynchronize::do_call_back()) {
      TEVENT (unpark before SAFEPOINT) ;
   }

   DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self);
   Trigger->unpark() ;// 唤醒之前被pack()挂起的线程

   // Maintain stats and report events to JVMTI
   if (ObjectMonitor::_sync_Parks != NULL) {
      ObjectMonitor::_sync_Parks->inc() ;
   }
}

monitor是重量级锁

可以看到ObjectMonitor的函数调用中会涉及到Atomic::cmpxchg_ptr,Atomic::inc_ptr等内核函数,执行同步代码块,没有竞争到锁的对象会park()被挂起,竞争到锁的线程会unpark()唤醒。这个时候就会存在操作系统用户态和内核态的转换,这种切换会消耗大量的系统资源。所以synchronized是Java语言中是一个重量级(Heavyweight)的操作。用户态和和内核态是什么东西呢?要想了解用户态和内核态还需要先了解一下Linux系统的体系架构:

从上图可以看出,Linux操作系统的体系架构分为:用户空间(应用程序的活动空间)和内核。

内核:本质上可以理解为一种软件,控制计算机的硬件资源,并提供上层应用程序运行的环境。

用户空间:上层应用程序活动的空间。应用程序的执行必须依托于内核提供的资源,包括CPU资源、存储资源、I/O资源等。

系统调用:为了使上层应用能够访问到这些资源,内核必须为上层应用提供访问的接口:即系统调用。

所有进程初始都运行于用户空间,此时即为用户运行状态(简称:用户态);但是当它调用系统调用执行某些操作时,例如 I/O调用,此时需要陷入内核中运行,我们就称进程处于内核运行态(或简称为内核态)。 系统调用的过程可以简单理解为:

  1. 用户态程序将一些数据值放在寄存器中, 或者使用参数创建一个堆栈, 以此表明需要操作系统提供的服务。
  2. 用户态程序执行系统调用。
  3. CPU切换到内核态,并跳到位于内存指定位置的指令。
  4. 系统调用处理器(system call handler)会读取程序放入内存的数据参数,并执行程序请求的服务。
  5. 系统调用完成后,操作系统会重置CPU为用户态并返回系统调用的结果。

由此可见用户态切换至内核态需要传递许多变量,同时内核还需要保护好用户态在切换时的一些寄存器值、变量等,以备内核态切换回用户态。这种切换就带来了大量的系统资源消耗,这就是在synchronized未优化之前,效率低的原因。

JDK6 synchronized优化

无锁

java对象默认是无锁的

偏向锁(适用于单线程)

偏向锁原理

偏向锁的“偏”,就是偏心的”偏“,偏袒的”偏“。他的意思是这个锁会偏向于上一个获得它的线程。

  1. 会在java对象头中存储获得当前锁线程id。

  2. 在下一个线程来竞争这个锁时,判断当前锁级别是否为偏向锁

  3. 若是偏向锁,则直接拿当前线程id与java对象头中存储的上一个获取该锁的线程id做比较,若相等,则可以获取该锁,若不相等,则不能获取改锁,不会走其他获取锁流程。

  4. 若不是偏向锁,则不走其他获取锁的流程

    偏向锁是为单线程场景设计的,减少了获取锁的开销,大大增加了单线程获取锁的效率

当线程第一次访问同步块并获取锁时,偏向锁处理流程如下:

  1. 虚拟机会将对象头的标志位设置为“01”,即偏向锁模式
  2. 同时使用CAS将Thread Id记录到对象头中,如果CAS操作成功,那么持有偏向锁的线程以后每次进入这个锁的同步代码时,虚拟机都可以不用进行任何同步操作,偏向锁的效率更高。
偏向锁的撤销
  1. 暂停拥有偏向锁的线程(获取全局安全点),获取被锁定的锁对象

  2. 撤销偏向锁,恢复到无锁(标志位为01)或者轻量级锁(标志位为00)的状态

    偏向锁在Java 6之后是默认启用的,但在应用程序启动几秒钟之后才激活,可以使用 -XX:BiasedLockingStartupDelay=0 参数关闭延迟

    如果确定应用程序中所有锁通常情况下处于竞争状态,可以通过 XX:-UseBiasedLocking=false 参数关闭偏向锁。

偏向锁好处

偏向锁是在只有一个线程执行同步块时进一步提高性能,适用于一个线程反复获得同一锁的情况。偏向锁可以提高带有同步但无竞争的程序性能。

它同样是一个带有效益权衡性质的优化,也就是说,它并不一定总是对程序运行有利,如果程序中大多数的锁总是被多个不同的线程访问比如线程池,那偏向模式就是多余的。

小结
偏向锁的原理是什么?

当锁对象第一次被线程获取的时候,虚拟机将会把对象头中的标志位设为“01”,即偏向模式。同时使用CAS操作把获取到这个锁的线程的ID记录在对象的Mark Word之中 ,如果CAS操作成功,持有偏向锁的线程以后每次进入这个锁相关的同步块时,虚拟机都可以不再进行任何同步操作,偏向锁的效率高。

偏向锁的好处是什么?

偏向锁是在只有一个线程执行同步块时进一步提高性能,适用于一个线程反复获得同一锁的情况。偏向锁可以提高带有同步但无竞争的程序性能。

轻量级锁(适用于多线程交替执行)

什么是轻量级锁

轻量级锁是JDK6与引入的,它名字中的“轻量级”是相对于使用monitor的传统锁而言的,因此传统的锁机制就称为“重量级”锁。

引入轻量级锁的目的:

在多线程交替执行同步块的情况下,尽量避免重量级锁引起的性能消耗,但是如果多个线程在同一时刻进入临界区,会导致轻量级锁膨胀升级重量级锁,所以轻量级锁的出现并非是要替代重量级锁。

轻量级锁原理

当撤销偏向锁或者有多线程竞争偏向锁时候,就会将锁升级为轻量级锁,其步骤如下:

  1. 判断当前对象是否无锁,如果是,jvm会在当前线程的栈帧中建立一份名为Lock Record的空间,然后将对象头中的数据复制到这份空间中,然后将lock record中的owner指向对象头。
  2. jvm尝试利用CAS将对象头的指针指向当前线程栈帧中的lock record,若成功,则代表获取锁成功,将锁标志位变成00
  3. 若失败,则判断对象头中的指针是否已经指向了当前线程中的栈帧,若是,那么就代表是锁重入,直接执行同步代码块,若不是,那么使用自适应自旋进行重新尝试

轻量级锁的释放
  1. 因为当前线程栈帧lock record中存储的是锁对象无锁的数据,那么在锁释放的时候,需要以CAS的方式将当前线程栈帧lock record中无锁的数据重新写回到对象头的mark word中
  2. 如果成功,则说明释放锁成功。
  3. 如果CAS操作替换失败,说明有其他线程尝试获取该锁,则需要将轻量级锁需要膨胀升级为重量级锁。

锁消除

关于锁消除,我们来看下面一段代码:

    public synchronized RLock getRlock() {
        return new RLock();
    }

这个同步方法是获取一个Rlock对象,当我们使用多线程调用这个方法时,每次得到的对象都不一样,所以不存在共享数据的问题,也就不存在锁竞争。

虚拟机即时编译器(JIT)在运行时,会自动检测到不可能存在共享数据竞争的锁进行锁消除。

锁消除后的效果:

public RLock getRlock() {
    return new RLock();
}

锁粗化

原则上,我们在编写代码的时候,总是推荐将同步块的作用范围限制得尽量小,只在共享数据的实际作用域中才进行同步,这样是为了使得需要同步的操作数量尽可能变小,如果存在锁竞争,那等待锁的线程也能尽快拿到锁。大部分情况下,上面的原则都是正确的,但是如果一系列的连续操作都对同一个对象反复加锁和解锁,甚至加锁操作是出现在循环体中的,那即使没有线程竞争,频繁地进行互斥同步操作也会导致不必要的性能损耗。

public void test() {
    for (int i = 0; i < 100; i++) {
        synchronized (object) {
            i++;
        }
    }
}

我们看以上方法,在for循环中,synchronized保证每个i++操作都是原子性的。但是以上的方法有个问题,就是在每次循环都会加锁,开销大,效率低。

虚拟机即时编译器(JIT)在运行时,会自动根据synchronized的影响范围进行锁粗化优化。

优化后代码:

public void test() {
    synchronized (object) {
        for (int i = 0; i < 100; i++) {
            i++;
        }
    }
}
轻量级锁的释放
  1. 因为当前线程栈帧lock record中存储的是锁对象无锁的数据,那么在锁释放的时候,需要以CAS的方式将当前线程栈帧lock record中无锁的数据重新写回到对象头的mark word中
  2. 如果成功,则说明释放锁成功。
  3. 如果CAS操作替换失败,说明有其他线程尝试获取该锁,则需要将轻量级锁需要膨胀升级为重量级锁。

锁消除

关于锁消除,我们来看下面一段代码:

    public synchronized RLock getRlock() {
        return new RLock();
    }

这个同步方法是获取一个Rlock对象,当我们使用多线程调用这个方法时,每次得到的对象都不一样,所以不存在共享数据的问题,也就不存在锁竞争。

虚拟机即时编译器(JIT)在运行时,会自动检测到不可能存在共享数据竞争的锁进行锁消除。

锁消除后的效果:

public RLock getRlock() {
    return new RLock();
}

锁粗化

原则上,我们在编写代码的时候,总是推荐将同步块的作用范围限制得尽量小,只在共享数据的实际作用域中才进行同步,这样是为了使得需要同步的操作数量尽可能变小,如果存在锁竞争,那等待锁的线程也能尽快拿到锁。大部分情况下,上面的原则都是正确的,但是如果一系列的连续操作都对同一个对象反复加锁和解锁,甚至加锁操作是出现在循环体中的,那即使没有线程竞争,频繁地进行互斥同步操作也会导致不必要的性能损耗。

public void test() {
    for (int i = 0; i < 100; i++) {
        synchronized (object) {
            i++;
        }
    }
}

我们看以上方法,在for循环中,synchronized保证每个i++操作都是原子性的。但是以上的方法有个问题,就是在每次循环都会加锁,开销大,效率低。

虚拟机即时编译器(JIT)在运行时,会自动根据synchronized的影响范围进行锁粗化优化。

优化后代码:

public void test() {
    synchronized (object) {
        for (int i = 0; i < 100; i++) {
            i++;
        }
    }
}

本文标签: 底层synchronized